Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
   /**
   * @return The row distribution code (plugin id)
   */
  public String getCode();

  /**
   * @return The row distribution description (plugin description)
   */
  public String getDescription();

  /**
   * Do the actual row distribution in the step
   * @param rowMeta the meta-data of the row to distribute
   * @param row the data of the row data to distribute
   * @param stepInterface The step to distribute the rows in
   * @throws KettleStepException
   */
  public void distributeRow(RowMetaInterface rowMeta, Object[] row, StepInterface stepInterface) throws KettleStepException;

  /**
   * Which mini-icon needs to be shown on the hop?
   *
   * @return the available code EImage or null if the standard icon needs to be used.  
  Return null if you don't want to display an icon.
   */
  public EImage getDistributionImage();

Example

The following code example describes a distribution algorithm that tries to lower the parallelism or more accurately, forces asynchronous operation of the target steps as they will get out of step with each other.  The output row set buffers are filled up one by one.  If one is full, the next one is approached:

Code Block

package org.pentaho.di.trans.step;package org.pentaho.di.trans.step;

import java.util.concurrent.TimeUnit;

import org.pentaho.di.core.RowSet;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.gui.PrimitiveGCInterface.EImage;
import org.pentaho.di.core.row.RowMetaInterface;

@RowDistributionPlugin(
    code="Overflow",
    name="Overflow",
    description="When an output row set is full, move to the next one"
    )
public class OverflowRowDistributionPlugin implements RowDistributionInterface {

  @Override
  public String getCode() {
    return "Overflow";
  }

  @Override
  public String getDescription() {
    return "When an output row set is full, move to the next one";
  }

  @Override
  public void distributeRow(RowMetaInterface rowMeta, Object[] row, StepInterface step)
      throws KettleStepException {

    RowSet rowSet = step.getOutputRowSets().get(step.getCurrentOutputRowSetNr());

    boolean added = false;
    while (!added) {
      added=rowSet.putRowWait(rowMeta, row, 1, TimeUnit.NANOSECONDS);
      if (added) {
        break;
      }
      step.setCurrentOutputRowSetNr(step.getCurrentOutputRowSetNr()+1);
      if (step.getCurrentOutputRowSetNr()>step.getOutputRowSets().size()-1) {
        step.setCurrentOutputRowSetNr(0);
      }
      rowSet = step.getOutputRowSets().get(step.getCurrentOutputRowSetNr());
    }

  }

  @Override
  public EImage getDistributionImage() {
    return null;
  }

}