Flume Collector Sink Decorator Plugin Finished

Before Christmas, I posted a blog about my trouble to write a Flume Collector Sink Decorator Plugin.

After doing some research and continue digging the underlining issue, I finally get a solution to this, which makes me super happy.

The issue I had was caused by error:

Exception in thread "main" java.lang.UnsupportedClassVersionError: garbagefilter/GarbageFilterDecorator : Unsupported major.minor version 51.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:634)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:186)
at com.cloudera.flume.conf.SourceFactoryImpl.loadPluginBuilders(SourceFactoryImpl.java:160)
at com.cloudera.flume.conf.SourceFactoryImpl.(SourceFactoryImpl.java:126)
at com.cloudera.flume.conf.FlumeBuilder.(FlumeBuilder.java:89)
at com.cloudera.flume.agent.LogicalNodeManager.spawn(LogicalNodeManager.java:75)
at com.cloudera.flume.agent.FlumeNode.setup(FlumeNode.java:529)
at com.cloudera.flume.agent.FlumeNode.main(FlumeNode.java:665)

This is due to the fact that I compiled my plugin under Java 1.7 and tried to run the .jar file under Java 1.6. This error did not appear in the flume log when I ran it as a service using command, which was why I had no clue on what was going on:

$ service flume-node start

It only appeared when I ran it directly on the command line:

$ flume node -n collector1

Not exactly sure why though. Anyway, it is now working, simply follow the steps outlined here.

This is my first Java code in the last 5 years, wow. I think I will spend more time to write more Flume plugins as we need to push more jobs to Flume to do some post processing for us.

Flume Collector Sink Decorator Plugin

In the last few days I have been trying to create a Flume sink decorator plugin to filter out any data that we consider as garbage (It is very disappointing that Flume doesn’t even support a very basic filtering capability). I have followed the steps on R&D Blog with slight modifications to the HelloWorldSinkDecorator Java class. The code compiled but when I plugged it into Flume and enabled it, the collector simply died with no error message what so ever.

I had to stop working on this on Thursday afternoon as I had to join our company’s Xmas party lunch in the city. But with this outstanding task not finished before the break makes me very unhappy. Currently I still can’t find a solution and have to continue digging it when I go back to office next week.

Flume has so many problems since the first day I used it and it is not flexible enough to do lots of simply tasks. I am wondering whether we should be looking for alternatives.

Very interested to see how Flume NG will turn out.

Add ChangLogable Behaviour To Your Symfony Models

Last week I have been working on a task to log backend edits to certain models in Symfony so that admin can keep track of all history of creates, edits and updates to tracked models. I have done some research and found some plugins that does similar job, but none of them were up-to-date, and most of them just don’t meet what we are really looking for.

For simplicity, we decided to develop one ourselves for Symfony 1.4 and for now only keep track of the columns on the table, i.e. ignore one-to-many or many-to-many relationships. I might develop it to a Symfony plugin in the future, but for the purpose of the post, I will simply outline what I did and how it works. I am sure someone might be benefits from the code.

To start with, I needed to enable the behaviour for Propel in the propel.ini configuration file:

propel.builder.addBehaviors = true

create a table with the following schema:

DROP TABLE IF EXISTS `change_log`;
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `admin_id` smallint(3) unsigned NOT NULL,
  `table` varchar(80) COLLATE utf8_unicode_ci NOT NULL,
  `primary_id` int(11) unsigned NOT NULL,
  `column` varchar(80) COLLATE utf8_unicode_ci NOT NULL,
  `action` varchar(10) COLLATE utf8_unicode_ci NOT NULL,
  `before` text COLLATE utf8_unicode_ci NOT NULL,
  `after` text COLLATE utf8_unicode_ci NOT NULL,
  PRIMARY KEY (`id`),
  KEY `admin_search` (`admin_id`,`created_at`),
  KEY `table_search` (`table`,`column`,`created_at`),
  KEY `column_search` (`column`,`primary_id`,`created_at`)

and I added the following to propel’s schema definition:

    _attributes: { phpName: ChangeLog }
    admin_id: ~
    table: { type: VARCHAR, size: '255', required: true }
    primary_id: { type: INTEGER, size: '11', required: true }
    column: { type: VARCHAR, size: '255', required: true }
    action: { type: VARCHAR, size: '10', required: true }
    before: { type: LONGVARCHAR, required: false }
    after: { type: LONGVARCHAR, required: false }

and then re-generate the models by running

./symfony propel:build-model

Secondly, I created a new behaviour class called “elChangeLogableBehaviour”, “el” for “Eric Lin” by the way


class elChangeLogableBehaviour {

  // The list of change logs that will record edit operations
  protected $_saveEntries = array();

  // The list of change logs that will record delete operations
  protected $_deleteEntries = array();

  public function preSave($object, $con) {

    // clear the instance pool so that we can get a fresh copy from database
    // in order to compare the before and after values
    call_user_func(array($object->getPeer(), 'clearInstancePool'));

    $modifiedColumns = $object->getModifiedColumns();
    $tableName = constant(get_class($object->getPeer())."::TABLE_NAME");
    $id = (int) $object->getPrimaryKey();

    // retrieve the object with old data ( the data in the database )
    $oldObject = call_user_func_array(array($object->getPeer(), 'retrieveByPk'), array($id));

    // default action is update
    $action = 'update';

    // if there is no record in the database, means it is an insert ( new record )
    if(!$oldObject) {

      $className = get_class($object);
      $oldObject = new $className;
      $action = 'insert';

    // go through each modified columns and create one change log each without saving them
    foreach($modifiedColumns as $column) {

      $beforeValue = $oldObject->getByName($column, BasePeer::TYPE_COLNAME);
      $afterValue = $object->getByName($column, BasePeer::TYPE_COLNAME);

      // if values are the same, don't record them
      if($beforeValue == $afterValue) continue;

      $changeLog = new ChangeLog();
      $changeLog->setColumn(call_user_func_array(array($object->getPeer(), 'translateFieldName'), array($column, BasePeer::TYPE_COLNAME, BasePeer::TYPE_FIELDNAME)));
      $changeLog->setCreatedAt(date('Y-m-d H:i:s'));

      $this->_saveEntries[$tableName][$id][$column] = $changeLog;

    return true;

  // Once the save is performed, save all the change logs for the given object
  public function postSave($object, $con) {

    $tableName = constant(get_class($object->getPeer())."::TABLE_NAME");
    if(isset($this->_saveEntries[$tableName])) {

      foreach($this->_saveEntries[$tableName] as $id => $columnChanges) {

        foreach($columnChanges as $column => $changeLog) {

          // need to update the primary for new records
          if($changeLog->getPrimaryId() == 0) {


  // Create a chagne log entry for delete operation
  public function preDelete($object, $con) {

    $tableName = constant(get_class($object->getPeer())."::TABLE_NAME");

    $changeLog = new ChangeLog();
    $changeLog->setCreatedAt(date('Y-m-d H:i:s'));

    $this->_deleteEntries[$tableName][$object->getId()] = $changeLog;

  // Save the delete change log after delete operation
  public function postDelete($object, $con) {

    foreach($this->_deleteEntries as $changeLogs) {

      foreach($changeLogs as $changeLog) {


I have also updated the myUser class to store the admin_id in the user session so that we can retrieve it easily:

public function logIn($id=0) {

  $this->setAttribute('userId', $id, 'currentUser');

public function getId() {

  return $this->getAttribute('userId', 0, 'currentUser');

Thirdly, I updated setup function in my project’s ProjectConfiguration class:

  public function setup() {
    // for compatibility / remove and enable only the plugins you want

    sfPropelBehavior::registerHooks('changelogable', array(
      ':save:pre'    => array('elChangeLogableBehaviour', 'preSave'),
      ':save:post'   => array('elChangeLogableBehaviour', 'postSave'),
      ':delete:pre'  => array('elChangeLogableBehaviour', 'preDelete'),
      ':delete:post' => array('elChangeLogableBehaviour', 'postDelete')

This allows me to register the hook to Propel’s models. I also needed to add

require_once dirname(__FILE__) . '/../lib/vendor/symfony/lib/plugins/sfPropelPlugin/lib/addon/sfPropelBehavior.class.php';

at the top of the file, otherwise my symfony will complain that the class is missing.

Then add:

propel.behavior.changelogable.class = lib.behaviour.elChangeLogableBehaviour

to my propel.ini file to enable the new behaviour for my Symfony project.

And finally I needed to register the new behaviour to the models that I need to track by adding the following line to the end of model class(es):

sfPropelBehavior::add('Publication', array('changelogable'));

That’s all, it should be able to record all changes done to the Propel model when someone changes data in the backend.

If you have any comments or suggestions, please post your idea to the comments below.