PDI Row Distribution Plugin Development
PDI Row Distribution Plugin Development
Introduction
A row distribution plugin allows you to distribute rows of data from one step to another in a specific way.
By default there are 2 ways to distribute data (not taking into account partitioning):
- Distribute rows of data in a round-robin fashion: each target step (copy) gets a row in turn, all target steps get equal amounts of rows
- Copy rows: all target step (copies) receive all rows.
Sometimes you might have a need to do more specific distribution of the data based on some rule, like the availability of space in the output buffer, the available resources on a machine or some other specific rule. Â This is where the row distribution plugins come in.
RowDistributionInterface
Row distribution plugins need to implement the RowDistributionInterface.
Here are the methods that need to be implemented:
 /**
  * @return The row distribution code (plugin id)
  */
 public String getCode();
 Â
 /**
  * @return The row distribution description (plugin description)
  */
 public String getDescription();
 /**
  * Do the actual row distribution in the step
  * @param rowMeta the meta-data of the row to distribute
  * @param row the data of the row data to distribute
  * @param stepInterface The step to distribute the rows in
  * @throws KettleStepException
  */
 public void distributeRow(RowMetaInterface rowMeta, Object[] row, StepInterface stepInterface) throws KettleStepException;
Â
 /**
  * Which mini-icon needs to be shown on the hop?
  *Â
  * @return the available code EImage or null if the standard icon needs to be used.
  */
 public EImage getDistributionImage();
/** * @return The row distribution code (plugin id) */ public String getCode(); /** * @return The row distribution description (plugin description) */ public String getDescription(); /** * Do the actual row distribution in the step * @param rowMeta the meta-data of the row to distribute * @param row the data of the row data to distribute * @param stepInterface The step to distribute the rows in * @throws KettleStepException */ public void distributeRow(RowMetaInterface rowMeta, Object[] row, StepInterface stepInterface) throws KettleStepException; /** * Which mini-icon needs to be shown on the hop? * * @return the available code EImage or null if the standard icon needs to be used. Â Return null if you don't want to display an icon. */ public EImage getDistributionImage();
Example
The following code example describes a distribution algorithm that tries to lower the parallelism or more accurately, forces asynchronous operation of the target steps as they will get out of step with each other. Â The output row set buffers are filled up one by one. Â If one is full, the next one is approached:
package org.pentaho.di.trans.step;package org.pentaho.di.trans.step; import java.util.concurrent.TimeUnit; import org.pentaho.di.core.RowSet; import org.pentaho.di.core.exception.KettleStepException; import org.pentaho.di.core.gui.PrimitiveGCInterface.EImage; import org.pentaho.di.core.row.RowMetaInterface; @RowDistributionPlugin( code="Overflow", name="Overflow", description="When an output row set is full, move to the next one" ) public class OverflowRowDistributionPlugin implements RowDistributionInterface { @Override public String getCode() { return "Overflow"; } @Override public String getDescription() { return "When an output row set is full, move to the next one"; } @Override public void distributeRow(RowMetaInterface rowMeta, Object[] row, StepInterface step) throws KettleStepException { RowSet rowSet = step.getOutputRowSets().get(step.getCurrentOutputRowSetNr()); boolean added = false; while (!added) { added=rowSet.putRowWait(rowMeta, row, 1, TimeUnit.NANOSECONDS); if (added) { break; } step.setCurrentOutputRowSetNr(step.getCurrentOutputRowSetNr()+1); if (step.getCurrentOutputRowSetNr()>step.getOutputRowSets().size()-1) { step.setCurrentOutputRowSetNr(0); } rowSet = step.getOutputRowSets().get(step.getCurrentOutputRowSetNr()); } } @Override public EImage getDistributionImage() { return null; } }
Available through right-click on a step: