Need help on TimeBasedDedupOperator

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Need help on TimeBasedDedupOperator

Vivek Bhide
Hi,

I'm trying to understand working of TimeBasedDedupOperator for my streaming
application. I'm using the example shown in Malhar dedup example:
https://github.com/apache/apex-malhar/blob/master/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java

I made few modifications to minimize the output.
Properties:
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.keyExpression</name>
    <value>id</value>
  </property>
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.timeExpression</name>
    <value>eventTime.getTime()</value>
  </property>
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.bucketSpan</name>
    <value>10</value>
  </property>
  <property>
   
<name>dt.application.DedupExample.operator.Deduper.prop.expireBefore</name>
    <value>60</value>
  </property>

Below is Application code:

public class Application implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    // Test Data Generator Operator
    RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new
RandomDataGeneratorOperator());

    // Dedup Operator. Configuration through
resources/META-INF/properties.xml
    TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new
TimeBasedDedupOperator());

    // Console output operator for unique tuples
    ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique",
new ConsoleOutputOperator());

    // Streams
    dag.addStream("Generator to Dedup", gen.output, dedup.input);

    // Connect Dedup unique to Console
    dag.addStream("Dedup Unique to Console", dedup.unique,
consoleUnique.input);
    // Set Attribute TUPLE_CLASS for supplying schema information to the
port
    dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS,
TestEvent.class);
}

  public static class RandomDataGeneratorOperator extends BaseOperator
implements InputOperator
  {

    public final transient DefaultOutputPort<TestEvent> output = new
DefaultOutputPort<>();
    private final transient Random r = new Random();
    private int tuplesPerWindow = 100;
    private transient int count = 0;

    @Override
    public void beginWindow(long windowId)
    {
      count = 0;
    }

    @Override
    public void emitTuples()
    {
      if (count++ > tuplesPerWindow) {
        return;
      }
      TestEvent event = new TestEvent();
      event.id = r.nextInt(2);
      long millis = System.currentTimeMillis();
      event.millis = millis;
      event.setTimeNow(new Date(millis));
//      event.eventTime = new Date( millis - (r.nextInt(60 * 1000)));
      event.eventTime = new Date(millis);
      output.emit(event);
    }
  }

  public static class TestEvent
  {
    private int id;
    private Date timeNow;
    private Date eventTime;
    private long millis;

    public TestEvent()
    {
    }
    public long getMillis() { return millis; }

    public int getId()
    {
      return id;
    }

    public void setId(int id)
    {
      this.id = id;
    }

    public Date getEventTime()
    {
      return eventTime;
    }

    public void setTimeNow(Date timeNow) {
      this.timeNow = timeNow;
    }

    public Date getTimeNow() {
      return timeNow;
    }

    public void setEventTime(Date eventTime)
    {
      this.eventTime = eventTime;
    }

    @Override
    public String toString()
    {
      return "TestEvent [id=" + id + "; millis = " + millis + "; nowTime=" +
timeNow + "; eventTime=" + eventTime + "]";
    }

  }

}

I executed this application using JUnit test using LocalMode. But, in the
console output I see duplicate records. I'm trying to understand the reason
behind the duplication message appearing in unique console:
1. Unique: TestEvent [id=1; millis = 1520413075333; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
2. Unique: TestEvent [id=1; millis = 1520413075334; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
3. Unique: TestEvent [id=0; millis = 1520413075363; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
4. Unique: TestEvent [id=0; millis = 1520413075364; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
5. Unique: TestEvent [id=0; millis = 1520413075365; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
6. Unique: TestEvent [id=0; millis = 1520413075366; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
7. Unique: TestEvent [id=0; millis = 1520413075367; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
8. Unique: TestEvent [id=0; millis = 1520413075368; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
9. Unique: TestEvent [id=0; millis = 1520413075369; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
10. Unique: TestEvent [id=1; millis = 1520413082317; nowTime=Wed Mar 07
00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
11. Unique: TestEvent [id=0; millis = 1520413082317; nowTime=Wed Mar 07
00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
12. Unique: TestEvent [id=0; millis = 1520413092321; nowTime=Wed Mar 07
00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]
13. Unique: TestEvent [id=1; millis = 1520413092321; nowTime=Wed Mar 07
00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]

I see lot of duplicates in unique port. Did I set any configuration wrong?

Any suggestions are appreciated.

Thanks



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Need help on TimeBasedDedupOperator

Bhupesh Chawda
Hi Vivek,

The deduper assumes a binding of the dedup key with the timestamp (expiry key) in case of dedup with expiry.

~ Bhupesh


_______________________________________________________

Bhupesh Chawda

E: [hidden email] | Twitter: @bhupeshsc

www.datatorrent.com  |  apex.apache.org



On Fri, Mar 16, 2018 at 2:35 AM, Vivek Bhide <[hidden email]> wrote:
Hi,

I'm trying to understand working of TimeBasedDedupOperator for my streaming
application. I'm using the example shown in Malhar dedup example:
https://github.com/apache/apex-malhar/blob/master/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java

I made few modifications to minimize the output.
Properties:
  <property>

<name>dt.application.DedupExample.operator.Deduper.prop.keyExpression</name>
    <value>id</value>
  </property>
  <property>

<name>dt.application.DedupExample.operator.Deduper.prop.timeExpression</name>
    <value>eventTime.getTime()</value>
  </property>
  <property>

<name>dt.application.DedupExample.operator.Deduper.prop.bucketSpan</name>
    <value>10</value>
  </property>
  <property>

<name>dt.application.DedupExample.operator.Deduper.prop.expireBefore</name>
    <value>60</value>
  </property>

Below is Application code:

public class Application implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    // Test Data Generator Operator
    RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new
RandomDataGeneratorOperator());

    // Dedup Operator. Configuration through
resources/META-INF/properties.xml
    TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new
TimeBasedDedupOperator());

    // Console output operator for unique tuples
    ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique",
new ConsoleOutputOperator());

    // Streams
    dag.addStream("Generator to Dedup", gen.output, dedup.input);

    // Connect Dedup unique to Console
    dag.addStream("Dedup Unique to Console", dedup.unique,
consoleUnique.input);
    // Set Attribute TUPLE_CLASS for supplying schema information to the
port
    dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS,
TestEvent.class);
}

  public static class RandomDataGeneratorOperator extends BaseOperator
implements InputOperator
  {

    public final transient DefaultOutputPort<TestEvent> output = new
DefaultOutputPort<>();
    private final transient Random r = new Random();
    private int tuplesPerWindow = 100;
    private transient int count = 0;

    @Override
    public void beginWindow(long windowId)
    {
      count = 0;
    }

    @Override
    public void emitTuples()
    {
      if (count++ > tuplesPerWindow) {
        return;
      }
      TestEvent event = new TestEvent();
      event.id = r.nextInt(2);
      long millis = System.currentTimeMillis();
      event.millis = millis;
      event.setTimeNow(new Date(millis));
//      event.eventTime = new Date( millis - (r.nextInt(60 * 1000)));
      event.eventTime = new Date(millis);
      output.emit(event);
    }
  }

  public static class TestEvent
  {
    private int id;
    private Date timeNow;
    private Date eventTime;
    private long millis;

    public TestEvent()
    {
    }
    public long getMillis() { return millis; }

    public int getId()
    {
      return id;
    }

    public void setId(int id)
    {
      this.id = id;
    }

    public Date getEventTime()
    {
      return eventTime;
    }

    public void setTimeNow(Date timeNow) {
      this.timeNow = timeNow;
    }

    public Date getTimeNow() {
      return timeNow;
    }

    public void setEventTime(Date eventTime)
    {
      this.eventTime = eventTime;
    }

    @Override
    public String toString()
    {
      return "TestEvent [id=" + id + "; millis = " + millis + "; nowTime=" +
timeNow + "; eventTime=" + eventTime + "]";
    }

  }

}

I executed this application using JUnit test using LocalMode. But, in the
console output I see duplicate records. I'm trying to understand the reason
behind the duplication message appearing in unique console:
1. Unique: TestEvent [id=1; millis = 1520413075333; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
2. Unique: TestEvent [id=1; millis = 1520413075334; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
3. Unique: TestEvent [id=0; millis = 1520413075363; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
4. Unique: TestEvent [id=0; millis = 1520413075364; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
5. Unique: TestEvent [id=0; millis = 1520413075365; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
6. Unique: TestEvent [id=0; millis = 1520413075366; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
7. Unique: TestEvent [id=0; millis = 1520413075367; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
8. Unique: TestEvent [id=0; millis = 1520413075368; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
9. Unique: TestEvent [id=0; millis = 1520413075369; nowTime=Wed Mar 07
00:57:55 PST 2018; eventTime=Wed Mar 07 00:57:55 PST 2018]
10. Unique: TestEvent [id=1; millis = 1520413082317; nowTime=Wed Mar 07
00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
11. Unique: TestEvent [id=0; millis = 1520413082317; nowTime=Wed Mar 07
00:58:02 PST 2018; eventTime=Wed Mar 07 00:58:02 PST 2018]
12. Unique: TestEvent [id=0; millis = 1520413092321; nowTime=Wed Mar 07
00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]
13. Unique: TestEvent [id=1; millis = 1520413092321; nowTime=Wed Mar 07
00:58:12 PST 2018; eventTime=Wed Mar 07 00:58:12 PST 2018]

I see lot of duplicates in unique port. Did I set any configuration wrong?

Any suggestions are appreciated.

Thanks



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Need help on TimeBasedDedupOperator

Vivek Bhide
Thanks Bhupesh.

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/