Apex and RabbitMQ problems with the input operator

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

Apex and RabbitMQ problems with the input operator

apex

Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a complete loss, while the examples are running fine I don't even get the RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and  com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know what exactly I'm doing wrong here?

Cheers

Manfred.


Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

Sanjay Pujare
Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are under the test directory under malhar-contrib and malhar-library respectively. You may need to build these jars yourself with test scope to include these packages.

On Wed, May 31, 2017 at 9:39 AM, <[hidden email]> wrote:

Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a complete loss, while the examples are running fine I don't even get the RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and  com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know what exactly I'm doing wrong here?

Cheers

Manfred.



Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

apex

Hello,

I compiled the whole thing but now I don't know exactly how to get it running in Apex. Do I need an application.java like in the tutorial? I do have a simple RabbitMQ queue up and running on the server. How do I consume the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>
    <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>
    <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>
    <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>
    <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are under the test directory under malhar-contrib and malhar-library respectively. You may need to build these jars yourself with test scope to include these packages.

On Wed, May 31, 2017 at 9:39 AM, <[hidden email]> wrote:

Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a complete loss, while the examples are running fine I don't even get the RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and  com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know what exactly I'm doing wrong here?

Cheers

Manfred.




Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

Vikram Patil
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  <[hidden email]> wrote:

> Hello,
>
> I compiled the whole thing but now I don't know exactly how to get it
> running in Apex. Do I need an application.java like in the tutorial? I do
> have a simple RabbitMQ queue up and running on the server. How do I consume
> the messages with Apex and write them to hdfs?
>
> Cheers,
>
> Manfred
>
> Following steps were necessary to get the RabbitMq test to compile
>
> @TimeoutException
> import java.util.concurrent.TimeoutException;
> public void setup() throws IOException,TimeoutException
> public void teardown() throws IOException,TimeoutException
> protected void runTest(final int testNum) throws IOException
>
> @Build jars
> cd apex-malhar/contrib/
> mvn clean package -DskipTests
>
> cd apex-malhar/library/
> mvn clean package -DskipTests
> copy packages to project directory
>
> @Link them to the project
> Add following lines to the pom.xml
> <dependency>
>     <groupId>contrib</groupId>
>     <artifactId>com.datatorrent.contrib.helper</artifactId>
>     <version>1.0</version>
>     <scope>system</scope>
>
> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
> </dependency>
> <dependency>
>     <groupId>lib</groupId>
>     <artifactId>com.datatorrent.lib.helper</artifactId>
>     <version>1.0</version>
>     <scope>system</scope>
>
> <systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
> </dependency>
> <dependency>
>     <groupId>contrib</groupId>
>     <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
>     <version>1.0</version>
>     <scope>system</scope>
>
> <systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
> </dependency>
> <dependency>
>     <groupId>Attribute</groupId>
>     <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
>     <version>1.0</version>
>     <scope>system</scope>
>
> <systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
> </dependency>
>
>
> Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:
>
> Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
> under the test directory under malhar-contrib and malhar-library
> respectively. You may need to build these jars yourself with test scope to
> include these packages.
>
> On Wed, May 31, 2017 at 9:39 AM, <[hidden email]> wrote:
>>
>> Hello, (mea culpa for messing up the headline the first time)
>>
>> I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
>> complete loss, while the examples are running fine I don't even get the
>> RabbitMQInputOperatorTest.java to run.
>>
>> First it couldn't find the rabbitmq-client which was solveable by adding
>> the dependency:
>>
>> <dependency>
>>     <groupId>com.rabbitmq</groupId>
>>     <artifactId>amqp-client</artifactId>
>>     <version>4.1.0</version>
>>   </dependency>
>>
>> But now it doesn't find the packages com.datatorrent.contrib.helper and
>> com.datatorrent.lib.helper and can't find several symbols.
>>
>> Needless to say that I'm a beginner regarding Apex so does anyone know
>> what exactly I'm doing wrong here?
>>
>> Cheers
>>
>> Manfred.
>>
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

apex

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.

Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.




Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

vikram patil-2
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")

If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.



      


Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

apex

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}


Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")


If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.





Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

apex

I still don't get it completely: (The rest of the code is in the Email before, this is only the necessary sample)

  1. dag.addStream("test", rabbitInput.output, out.input);
    Results in the following error:
    [ERROR]   symbol:   variable output
    [ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

  2. dag.addStream("test", rabbitInput.outputPort, out.input);
    Results in the following error:
    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] no suitable method found for addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (cannot infer type-variable(s) T
    [ERROR]         (actual and formal argument lists differ in length))



It seems that on the one hand the RabbitMQInputOperator.class does not have an output method and on the other hand the addStream method only accepts outputPort combined with inputPort methods or output and input methods of the corresponding classes. Does that mean I only can use a class that implements inputPort method for this example?

Cheers

Manfred.



Am 08.06.2017 um 10:05 schrieb [hidden email]:

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}

Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")



If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.






Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

apex

Okay i found the error, I copied the LineOutputOperator.java from the jmsActiveMQ example and found  there
public class LineOutputOperator extends AbstractFileOutputOperator<String>

Instead i took the LineOutputOperator.java  from the Kafka 0.9 example there the class is correctly defined for the RabbitMQInputOperator

So far so good now it compiles without errors.

Cheers

Manfred

Am 08.06.2017 um 17:38 schrieb [hidden email]:

I still don't get it completely: (The rest of the code is in the Email before, this is only the necessary sample)

  1. dag.addStream("test", rabbitInput.output, out.input);
    Results in the following error:
    [ERROR]   symbol:   variable output
    [ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

  2. dag.addStream("test", rabbitInput.outputPort, out.input);
    Results in the following error:
    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] no suitable method found for addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (cannot infer type-variable(s) T
    [ERROR]         (actual and formal argument lists differ in length))



It seems that on the one hand the RabbitMQInputOperator.class does not have an output method and on the other hand the addStream method only accepts outputPort combined with inputPort methods or output and input methods of the corresponding classes. Does that mean I only can use a class that implements inputPort method for this example?

Cheers

Manfred.



Am 08.06.2017 um 10:05 schrieb [hidden email]:

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}

Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")



If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.







Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

apex

Finally got rid of all errors but now I have the problem that the apex application does not seem to register at the RabbitMQ exchange.

This is my code:

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    //in.setPort(5672);
    in.setExchange("apex");
    in.setExchangeType("fanout");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);

}

}

If I launch the application everthing executes without an error but if i list the bindings on the exchange, there is none.

Anyone even an idea how i can start to debug this?

Cheers
Manfred.


Am 08.06.2017 um 18:04 schrieb [hidden email]:

Okay i found the error, I copied the LineOutputOperator.java from the jmsActiveMQ example and found  there
public class LineOutputOperator extends AbstractFileOutputOperator<String>

Instead i took the LineOutputOperator.java  from the Kafka 0.9 example there the class is correctly defined for the RabbitMQInputOperator

So far so good now it compiles without errors.

Cheers

Manfred

Am 08.06.2017 um 17:38 schrieb [hidden email]:

I still don't get it completely: (The rest of the code is in the Email before, this is only the necessary sample)

  1. dag.addStream("test", rabbitInput.output, out.input);
    Results in the following error:
    [ERROR]   symbol:   variable output
    [ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

  2. dag.addStream("test", rabbitInput.outputPort, out.input);
    Results in the following error:
    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] no suitable method found for addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (cannot infer type-variable(s) T
    [ERROR]         (actual and formal argument lists differ in length))



It seems that on the one hand the RabbitMQInputOperator.class does not have an output method and on the other hand the addStream method only accepts outputPort combined with inputPort methods or output and input methods of the corresponding classes. Does that mean I only can use a class that implements inputPort method for this example?

Cheers

Manfred.



Am 08.06.2017 um 10:05 schrieb [hidden email]:

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}

Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")



If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.








Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

vikram patil-2
1) Are you doing it on your local environment?
2) If you are doing it locally I would suggest following options
     1) If you dont want to create queue on rabbitmq by yourself . Set queuename on operator
        in.setQueueName("YOUR_QUEUE_NAME" )
         Operator will do following steps :
           * Create Durable Queue in RabbitMQ
           * You have specfied exchange and exchangeType .
                 So it will create an exchange using this  information and bind created queue with exchange with default routing key which will be "".
        Right now it must be creating auto generated unique named queue for you. 
       
      2) You can create your own exchange and durable queue using rabbitmq admin . You will have to install rabbitmq plugins for that. You can use it to publish some test data as well.

Using apex-cli you can check status of your application, if its failing then you should check logs from userlogs in hadoop logs directory.

Thanks & Regards,
Vikram

  
       

On Fri, Jun 9, 2017 at 6:42 PM, <[hidden email]> wrote:

Finally got rid of all errors but now I have the problem that the apex application does not seem to register at the RabbitMQ exchange.

This is my code:

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    //in.setPort(5672);
    in.setExchange("apex");
    in.setExchangeType("fanout");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);

}

}

If I launch the application everthing executes without an error but if i list the bindings on the exchange, there is none.

Anyone even an idea how i can start to debug this?

Cheers
Manfred.



Am 08.06.2017 um 18:04 schrieb [hidden email]:

Okay i found the error, I copied the LineOutputOperator.java from the jmsActiveMQ example and found  there
public class LineOutputOperator extends AbstractFileOutputOperator<String>

Instead i took the LineOutputOperator.java  from the Kafka 0.9 example there the class is correctly defined for the RabbitMQInputOperator

So far so good now it compiles without errors.

Cheers

Manfred

Am 08.06.2017 um 17:38 schrieb [hidden email]:

I still don't get it completely: (The rest of the code is in the Email before, this is only the necessary sample)

  1. dag.addStream("test", rabbitInput.output, out.input);
    Results in the following error:
    [ERROR]   symbol:   variable output
    [ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

  2. dag.addStream("test", rabbitInput.outputPort, out.input);
    Results in the following error:
    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] no suitable method found for addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (cannot infer type-variable(s) T
    [ERROR]         (actual and formal argument lists differ in length))



It seems that on the one hand the RabbitMQInputOperator.class does not have an output method and on the other hand the addStream method only accepts outputPort combined with inputPort methods or output and input methods of the corresponding classes. Does that mean I only can use a class that implements inputPort method for this example?

Cheers

Manfred.



Am 08.06.2017 um 10:05 schrieb [hidden email]:

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}

Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")



If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.









Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

darshan khade
In reply to this post by apex
I have to unsubscribe this apex mails.


                          Thank You.

Sent from BlueMail
On 8 Jun 2017, at 1:35 pm, [hidden email] wrote:

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}


Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect?  I see in your code snippet above, name of of RabbiMQInputOperator is   "Consumer".

In property name, you need to provide operator name you have specified in addOperator( "NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
      dt.operator.rabbitMQIn. prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer. prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")


If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn. prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.





Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

apex
In reply to this post by vikram patil-2

Hello,

@2.) Does it have to be a durable queue or does an exchange suffice?

Cheers

Manfred


Am 09.06.2017 um 15:23 schrieb vikram patil:
1) Are you doing it on your local environment?
2) If you are doing it locally I would suggest following options
     1) If you dont want to create queue on rabbitmq by yourself . Set queuename on operator
        in.setQueueName("YOUR_QUEUE_NAME" )
         Operator will do following steps :
           * Create Durable Queue in RabbitMQ
           * You have specfied exchange and exchangeType .
                 So it will create an exchange using this  information and bind created queue with exchange with default routing key which will be "".
        Right now it must be creating auto generated unique named queue for you. 
       
      2) You can create your own exchange and durable queue using rabbitmq admin . You will have to install rabbitmq plugins for that. You can use it to publish some test data as well.

Using apex-cli you can check status of your application, if its failing then you should check logs from userlogs in hadoop logs directory.

Thanks & Regards,
Vikram

  
       

On Fri, Jun 9, 2017 at 6:42 PM, <[hidden email]> wrote:

Finally got rid of all errors but now I have the problem that the apex application does not seem to register at the RabbitMQ exchange.

This is my code:

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    //in.setPort(5672);
    in.setExchange("apex");
    in.setExchangeType("fanout");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);

}

}

If I launch the application everthing executes without an error but if i list the bindings on the exchange, there is none.

Anyone even an idea how i can start to debug this?

Cheers
Manfred.



Am 08.06.2017 um 18:04 schrieb [hidden email]:

Okay i found the error, I copied the LineOutputOperator.java from the jmsActiveMQ example and found  there
public class LineOutputOperator extends AbstractFileOutputOperator<String>

Instead i took the LineOutputOperator.java  from the Kafka 0.9 example there the class is correctly defined for the RabbitMQInputOperator

So far so good now it compiles without errors.

Cheers

Manfred

Am 08.06.2017 um 17:38 schrieb [hidden email]:

I still don't get it completely: (The rest of the code is in the Email before, this is only the necessary sample)

  1. dag.addStream("test", rabbitInput.output, out.input);
    Results in the following error:
    [ERROR]   symbol:   variable output
    [ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

  2. dag.addStream("test", rabbitInput.outputPort, out.input);
    Results in the following error:
    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] no suitable method found for addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (cannot infer type-variable(s) T
    [ERROR]         (actual and formal argument lists differ in length))



It seems that on the one hand the RabbitMQInputOperator.class does not have an output method and on the other hand the addStream method only accepts outputPort combined with inputPort methods or output and input methods of the corresponding classes. Does that mean I only can use a class that implements inputPort method for this example?

Cheers

Manfred.



Am 08.06.2017 um 10:05 schrieb [hidden email]:

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}

Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")



If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.










Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

apex
In reply to this post by vikram patil-2

Hello,

you were completely right it seems that there are problems with my test scenario regarding the hadoop, yarn installation and the application never starts. I found this entries in the log:

2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: Registering app attempt : appattempt_1495629011552_0011_000001
2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: not starting application as amIfStarted exceeds amLimit
2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: Application added - appId: application_1495629011552_0011 user: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@11536dc, leaf-queue: default #user-pending-applications: 1 #user-active-applications: 1 #queue-pending-applications: 1 #queue-active-applications: 1


Therefore the application never leaves the state "undefined". Since the local tests were running fine and the launch of the application didn't raise an error I missed the problem with the hadoop installation. Thanks for the correct hint to look at the hadoop cluster.

Cheers
Manfred.


Am 09.06.2017 um 15:23 schrieb vikram patil:
1) Are you doing it on your local environment?
2) If you are doing it locally I would suggest following options
     1) If you dont want to create queue on rabbitmq by yourself . Set queuename on operator
        in.setQueueName("YOUR_QUEUE_NAME" )
         Operator will do following steps :
           * Create Durable Queue in RabbitMQ
           * You have specfied exchange and exchangeType .
                 So it will create an exchange using this  information and bind created queue with exchange with default routing key which will be "".
        Right now it must be creating auto generated unique named queue for you. 
       
      2) You can create your own exchange and durable queue using rabbitmq admin . You will have to install rabbitmq plugins for that. You can use it to publish some test data as well.

Using apex-cli you can check status of your application, if its failing then you should check logs from userlogs in hadoop logs directory.

Thanks & Regards,
Vikram

  
       

On Fri, Jun 9, 2017 at 6:42 PM, <[hidden email]> wrote:

Finally got rid of all errors but now I have the problem that the apex application does not seem to register at the RabbitMQ exchange.

This is my code:

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    //in.setPort(5672);
    in.setExchange("apex");
    in.setExchangeType("fanout");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);

}

}

If I launch the application everthing executes without an error but if i list the bindings on the exchange, there is none.

Anyone even an idea how i can start to debug this?

Cheers
Manfred.



Am 08.06.2017 um 18:04 schrieb [hidden email]:

Okay i found the error, I copied the LineOutputOperator.java from the jmsActiveMQ example and found  there
public class LineOutputOperator extends AbstractFileOutputOperator<String>

Instead i took the LineOutputOperator.java  from the Kafka 0.9 example there the class is correctly defined for the RabbitMQInputOperator

So far so good now it compiles without errors.

Cheers

Manfred

Am 08.06.2017 um 17:38 schrieb [hidden email]:

I still don't get it completely: (The rest of the code is in the Email before, this is only the necessary sample)

  1. dag.addStream("test", rabbitInput.output, out.input);
    Results in the following error:
    [ERROR]   symbol:   variable output
    [ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

  2. dag.addStream("test", rabbitInput.outputPort, out.input);
    Results in the following error:
    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] no suitable method found for addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (cannot infer type-variable(s) T
    [ERROR]         (actual and formal argument lists differ in length))



It seems that on the one hand the RabbitMQInputOperator.class does not have an output method and on the other hand the addStream method only accepts outputPort combined with inputPort methods or output and input methods of the corresponding classes. Does that mean I only can use a class that implements inputPort method for this example?

Cheers

Manfred.



Am 08.06.2017 um 10:05 schrieb [hidden email]:

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}

Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")



If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.










Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

apex

I drilled the error down to this message:

Mkdirs failed to create file:/home/pi/datatorrent/apps/application_1498123667708_0001/checkpoints/2

I guess i have something buggy in my configuration does any of you know how to solve this error? Should I start the application with the same user I'm starting yarn?

Cheers

Manfred.



Am 10.06.2017 um 14:50 schrieb [hidden email]:

Hello,

you were completely right it seems that there are problems with my test scenario regarding the hadoop, yarn installation and the application never starts. I found this entries in the log:

2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: Registering app attempt : appattempt_1495629011552_0011_000001
2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: not starting application as amIfStarted exceeds amLimit
2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: Application added - appId: application_1495629011552_0011 user: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@11536dc, leaf-queue: default #user-pending-applications: 1 #user-active-applications: 1 #queue-pending-applications: 1 #queue-active-applications: 1


Therefore the application never leaves the state "undefined". Since the local tests were running fine and the launch of the application didn't raise an error I missed the problem with the hadoop installation. Thanks for the correct hint to look at the hadoop cluster.

Cheers
Manfred.


Am 09.06.2017 um 15:23 schrieb vikram patil:
1) Are you doing it on your local environment?
2) If you are doing it locally I would suggest following options
     1) If you dont want to create queue on rabbitmq by yourself . Set queuename on operator
        in.setQueueName("YOUR_QUEUE_NAME" )
         Operator will do following steps :
           * Create Durable Queue in RabbitMQ
           * You have specfied exchange and exchangeType .
                 So it will create an exchange using this  information and bind created queue with exchange with default routing key which will be "".
        Right now it must be creating auto generated unique named queue for you. 
       
      2) You can create your own exchange and durable queue using rabbitmq admin . You will have to install rabbitmq plugins for that. You can use it to publish some test data as well.

Using apex-cli you can check status of your application, if its failing then you should check logs from userlogs in hadoop logs directory.

Thanks & Regards,
Vikram

  
       

On Fri, Jun 9, 2017 at 6:42 PM, <[hidden email]> wrote:

Finally got rid of all errors but now I have the problem that the apex application does not seem to register at the RabbitMQ exchange.

This is my code:

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    //in.setPort(5672);
    in.setExchange("apex");
    in.setExchangeType("fanout");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);

}

}

If I launch the application everthing executes without an error but if i list the bindings on the exchange, there is none.

Anyone even an idea how i can start to debug this?

Cheers
Manfred.



Am 08.06.2017 um 18:04 schrieb [hidden email]:

Okay i found the error, I copied the LineOutputOperator.java from the jmsActiveMQ example and found  there
public class LineOutputOperator extends AbstractFileOutputOperator<String>

Instead i took the LineOutputOperator.java  from the Kafka 0.9 example there the class is correctly defined for the RabbitMQInputOperator

So far so good now it compiles without errors.

Cheers

Manfred

Am 08.06.2017 um 17:38 schrieb [hidden email]:

I still don't get it completely: (The rest of the code is in the Email before, this is only the necessary sample)

  1. dag.addStream("test", rabbitInput.output, out.input);
    Results in the following error:
    [ERROR]   symbol:   variable output
    [ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

  2. dag.addStream("test", rabbitInput.outputPort, out.input);
    Results in the following error:
    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] no suitable method found for addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (cannot infer type-variable(s) T
    [ERROR]         (actual and formal argument lists differ in length))



It seems that on the one hand the RabbitMQInputOperator.class does not have an output method and on the other hand the addStream method only accepts outputPort combined with inputPort methods or output and input methods of the corresponding classes. Does that mean I only can use a class that implements inputPort method for this example?

Cheers

Manfred.



Am 08.06.2017 um 10:05 schrieb [hidden email]:

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}

Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")



If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.











Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

apex

After various tests I finally got it all working nicely and for future users I'll post here how.

First the rabbitMQ configuration that was the only working one:
rabbitmqadmin declare exchange name=apex type=fanout durable=false
rabbitmqadmin declare queue name=test durable=true
rabbitmqadmin binding source="apex" destination_type="queue" destination="test" routing_key="rktest"


It is important that apex only accepts a non-durable exchange. But this means you have to recreate it everytime you restart your RabbitMQ service.

The "Mkdirs failed to create" error:
This just means that the DFS service is down or in my case the safemode is on.
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter


My example uses the following (I moved the operator values in a corresponding *.xml file I just listed them here for better understanding):

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("192.168.33.63");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("test");

   LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());
   out.setFilePath("/hdfs/rabbitMQ");
   out.setBaseName("rabbitOut");
   out.setMaxLength(1024);
   out.setRotationWindows(4);
   dag.addStream("data", in.outputPort, out.input);
}
}


And the corresponding Output Operator. The only important thing here was that it extends the byte AbstractFileOutputOperator

public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
  private static final String NL = System.lineSeparator();
  private static final Charset CS = StandardCharsets.UTF_8;

  @NotNull
  private String baseName;

  @Override
  public byte[] getBytesForTuple(byte[] t) {
    String result = new String(t, CS) + NL;
    return result.getBytes(CS);
 }

  @Override
  protected String getFileName(byte[] tuple) {
    return baseName;
  }

  public String getBaseName() { return baseName; }
  public void setBaseName(String v) { baseName = v; }
}

The most pressing issue was that it won't run on the yarn cluster only in local mode. I still have no idea why it diden't run but my best guess is that it was a bad idea in the beginning to try the apex app in a Rasperry Pi 3 cluster. I switched to a standard Arch Linux Server with 8GB RAM and without changing a thing in the application it worked perfectly.

Thanks for all the help!

Am 22.06.2017 um 11:33 schrieb [hidden email]:

I drilled the error down to this message:

Mkdirs failed to create file:/home/pi/datatorrent/apps/application_1498123667708_0001/checkpoints/2

I guess i have something buggy in my configuration does any of you know how to solve this error? Should I start the application with the same user I'm starting yarn?

Cheers

Manfred.



Am 10.06.2017 um 14:50 schrieb [hidden email]:

Hello,

you were completely right it seems that there are problems with my test scenario regarding the hadoop, yarn installation and the application never starts. I found this entries in the log:

2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: Registering app attempt : appattempt_1495629011552_0011_000001
2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: not starting application as amIfStarted exceeds amLimit
2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: Application added - appId: application_1495629011552_0011 user: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@11536dc, leaf-queue: default #user-pending-applications: 1 #user-active-applications: 1 #queue-pending-applications: 1 #queue-active-applications: 1


Therefore the application never leaves the state "undefined". Since the local tests were running fine and the launch of the application didn't raise an error I missed the problem with the hadoop installation. Thanks for the correct hint to look at the hadoop cluster.

Cheers
Manfred.


Am 09.06.2017 um 15:23 schrieb vikram patil:
1) Are you doing it on your local environment?
2) If you are doing it locally I would suggest following options
     1) If you dont want to create queue on rabbitmq by yourself . Set queuename on operator
        in.setQueueName("YOUR_QUEUE_NAME" )
         Operator will do following steps :
           * Create Durable Queue in RabbitMQ
           * You have specfied exchange and exchangeType .
                 So it will create an exchange using this  information and bind created queue with exchange with default routing key which will be "".
        Right now it must be creating auto generated unique named queue for you. 
       
      2) You can create your own exchange and durable queue using rabbitmq admin . You will have to install rabbitmq plugins for that. You can use it to publish some test data as well.

Using apex-cli you can check status of your application, if its failing then you should check logs from userlogs in hadoop logs directory.

Thanks & Regards,
Vikram

  
       

On Fri, Jun 9, 2017 at 6:42 PM, <[hidden email]> wrote:

Finally got rid of all errors but now I have the problem that the apex application does not seem to register at the RabbitMQ exchange.

This is my code:

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    //in.setPort(5672);
    in.setExchange("apex");
    in.setExchangeType("fanout");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);

}

}

If I launch the application everthing executes without an error but if i list the bindings on the exchange, there is none.

Anyone even an idea how i can start to debug this?

Cheers
Manfred.



Am 08.06.2017 um 18:04 schrieb [hidden email]:

Okay i found the error, I copied the LineOutputOperator.java from the jmsActiveMQ example and found  there
public class LineOutputOperator extends AbstractFileOutputOperator<String>

Instead i took the LineOutputOperator.java  from the Kafka 0.9 example there the class is correctly defined for the RabbitMQInputOperator

So far so good now it compiles without errors.

Cheers

Manfred

Am 08.06.2017 um 17:38 schrieb [hidden email]:

I still don't get it completely: (The rest of the code is in the Email before, this is only the necessary sample)

  1. dag.addStream("test", rabbitInput.output, out.input);
    Results in the following error:
    [ERROR]   symbol:   variable output
    [ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

  2. dag.addStream("test", rabbitInput.outputPort, out.input);
    Results in the following error:
    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] no suitable method found for addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (cannot infer type-variable(s) T
    [ERROR]         (actual and formal argument lists differ in length))



It seems that on the one hand the RabbitMQInputOperator.class does not have an output method and on the other hand the addStream method only accepts outputPort combined with inputPort methods or output and input methods of the corresponding classes. Does that mean I only can use a class that implements inputPort method for this example?

Cheers

Manfred.



Am 08.06.2017 um 10:05 schrieb [hidden email]:

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}

Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")



If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.












Reply | Threaded
Open this post in threaded view
|

Re: Apex and RabbitMQ problems with the input operator

Vikram Patil
Thanks Manfred for resolving this issue.  I checked in the code now. As you suggested RabbitMQInputOperator seems to be supporting non-durable exchange but with durable queues. That seems inconsistent. Please feel free to create ticket for an improvement for RabbiMQInputOperator regarding this issue.

Thanks & Regards,
Vikram

On Fri, Jul 7, 2017 at 2:48 PM, <[hidden email]> wrote:

After various tests I finally got it all working nicely and for future users I'll post here how.

First the rabbitMQ configuration that was the only working one:
rabbitmqadmin declare exchange name=apex type=fanout durable=false
rabbitmqadmin declare queue name=test durable=true
rabbitmqadmin binding source="apex" destination_type="queue" destination="test" routing_key="rktest"


It is important that apex only accepts a non-durable exchange. But this means you have to recreate it everytime you restart your RabbitMQ service.

The "Mkdirs failed to create" error:
This just means that the DFS service is down or in my case the safemode is on.
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter


My example uses the following (I moved the operator values in a corresponding *.xml file I just listed them here for better understanding):

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("192.168.33.63");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("test");

   LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());
   out.setFilePath("/hdfs/rabbitMQ");
   out.setBaseName("rabbitOut");
   out.setMaxLength(1024);
   out.setRotationWindows(4);
   dag.addStream("data", in.outputPort, out.input);
}
}


And the corresponding Output Operator. The only important thing here was that it extends the byte AbstractFileOutputOperator

public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
  private static final String NL = System.lineSeparator();
  private static final Charset CS = StandardCharsets.UTF_8;

  @NotNull
  private String baseName;

  @Override
  public byte[] getBytesForTuple(byte[] t) {
    String result = new String(t, CS) + NL;
    return result.getBytes(CS);
 }

  @Override
  protected String getFileName(byte[] tuple) {
    return baseName;
  }

  public String getBaseName() { return baseName; }
  public void setBaseName(String v) { baseName = v; }
}

The most pressing issue was that it won't run on the yarn cluster only in local mode. I still have no idea why it diden't run but my best guess is that it was a bad idea in the beginning to try the apex app in a Rasperry Pi 3 cluster. I switched to a standard Arch Linux Server with 8GB RAM and without changing a thing in the application it worked perfectly.

Thanks for all the help!


Am 22.06.2017 um 11:33 schrieb [hidden email]:

I drilled the error down to this message:

Mkdirs failed to create file:/home/pi/datatorrent/apps/application_1498123667708_0001/checkpoints/2

I guess i have something buggy in my configuration does any of you know how to solve this error? Should I start the application with the same user I'm starting yarn?

Cheers

Manfred.



Am 10.06.2017 um 14:50 schrieb [hidden email]:

Hello,

you were completely right it seems that there are problems with my test scenario regarding the hadoop, yarn installation and the application never starts. I found this entries in the log:

2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService: Registering app attempt : appattempt_1495629011552_0011_000001
2017-06-10 14:33:02,623 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1495629011552_0011_000001 State change from NEW to SUBMITTED
2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: not starting application as amIfStarted exceeds amLimit
2017-06-10 14:33:02,624 INFO org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue: Application added - appId: application_1495629011552_0011 user: org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue$User@11536dc, leaf-queue: default #user-pending-applications: 1 #user-active-applications: 1 #queue-pending-applications: 1 #queue-active-applications: 1


Therefore the application never leaves the state "undefined". Since the local tests were running fine and the launch of the application didn't raise an error I missed the problem with the hadoop installation. Thanks for the correct hint to look at the hadoop cluster.

Cheers
Manfred.


Am 09.06.2017 um 15:23 schrieb vikram patil:
1) Are you doing it on your local environment?
2) If you are doing it locally I would suggest following options
     1) If you dont want to create queue on rabbitmq by yourself . Set queuename on operator
        in.setQueueName("YOUR_QUEUE_NAME" )
         Operator will do following steps :
           * Create Durable Queue in RabbitMQ
           * You have specfied exchange and exchangeType .
                 So it will create an exchange using this  information and bind created queue with exchange with default routing key which will be "".
        Right now it must be creating auto generated unique named queue for you. 
       
      2) You can create your own exchange and durable queue using rabbitmq admin . You will have to install rabbitmq plugins for that. You can use it to publish some test data as well.

Using apex-cli you can check status of your application, if its failing then you should check logs from userlogs in hadoop logs directory.

Thanks & Regards,
Vikram

  
       

On Fri, Jun 9, 2017 at 6:42 PM, <[hidden email]> wrote:

Finally got rid of all errors but now I have the problem that the apex application does not seem to register at the RabbitMQ exchange.

This is my code:

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    //in.setPort(5672);
    in.setExchange("apex");
    in.setExchangeType("fanout");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);

}

}

If I launch the application everthing executes without an error but if i list the bindings on the exchange, there is none.

Anyone even an idea how i can start to debug this?

Cheers
Manfred.



Am 08.06.2017 um 18:04 schrieb [hidden email]:

Okay i found the error, I copied the LineOutputOperator.java from the jmsActiveMQ example and found  there
public class LineOutputOperator extends AbstractFileOutputOperator<String>

Instead i took the LineOutputOperator.java  from the Kafka 0.9 example there the class is correctly defined for the RabbitMQInputOperator

So far so good now it compiles without errors.

Cheers

Manfred

Am 08.06.2017 um 17:38 schrieb [hidden email]:

I still don't get it completely: (The rest of the code is in the Email before, this is only the necessary sample)

  1. dag.addStream("test", rabbitInput.output, out.input);
    Results in the following error:
    [ERROR]   symbol:   variable output
    [ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

  2. dag.addStream("test", rabbitInput.outputPort, out.input);
    Results in the following error:
    [ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,8] no suitable method found for addStream(java.lang.String,com.datatorrent.api.DefaultOutputPort<byte[]>,com.datatorrent.api.DefaultInputPort<java.lang.String>)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>...) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (inferred type does not conform to upper bound(s)
    [ERROR]         inferred: byte[]
    [ERROR]         upper bound(s): java.lang.String,java.lang.Object)
    [ERROR]     method com.datatorrent.api.DAG.<T>addStream(java.lang.String,com.datatorrent.api.Operator.OutputPort<? extends T>,com.datatorrent.api.Operator.InputPort<? super T>,com.datatorrent.api.Operator.InputPort<? super T>) is not applicable
    [ERROR]       (cannot infer type-variable(s) T
    [ERROR]         (actual and formal argument lists differ in length))



It seems that on the one hand the RabbitMQInputOperator.class does not have an output method and on the other hand the addStream method only accepts outputPort combined with inputPort methods or output and input methods of the corresponding classes. Does that mean I only can use a class that implements inputPort method for this example?

Cheers

Manfred.



Am 08.06.2017 um 10:05 schrieb [hidden email]:

Sorry the two Snippets below where from different iterations. The Error I get is the following:

[ERROR] /home/pi/apex/rabbitMQ/src/main/java/com/example/rabbitMQ/RabbitMQApplication.java:[31,38] cannot find symbol
[ERROR]   symbol:   variable output
[ERROR]   location: variable rabbitInput of type com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator

My Code is as follows:


package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {

    RabbitMQInputOperator rabbitInput = dag.addOperator("Consumer",RabbitMQInputOperator.class);
    rabbitInput.setHost("localhost");
    rabbitInput.setPort(5672);
    rabbitInput.setExchange("");
    rabbitInput.setQueueName("hello");
    LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());

    dag.addStream("data", rabbitInput.output, out.input);
}
}

Cheers

Manfred.



Am 08.06.2017 um 04:34 schrieb vikram patil:
Hi,
dag.addStream() is actually used to create stream of from one Operator output port to other operators input port.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 
Looks like your operator name is incorrect? I see in your code snippet above, name of of RabbiMQInputOperator is "Consumer".

In property name, you need to provide operator name you have specified in addOperator("NAME OF THE OPERATOR", RabbitMQInputOperator.class) api call.
 
     dt.operator.rabbitMQIn.prop.tuple_blast ( Syntax is correct correct given your operator name is correct ). 

( It should be 
dt.operator.Consumer.prop.tuple_blast based on your code snippet ).

I think tests which are provided in the Apache Malhar are very detailed, they run in local mode as unit tests so we have mocked actual rabbitmq by custom message publisher. 

For timebeing you set only queuename and hostname as

   //  set your rabbitmq host . 
consumer.setHost("localhost"); // set your rabbitmq port consumer.setPort(5672) // It depends on your rabbitmq producer configuration but by default // its default exchange with "" ( No Name is provided ). consumer.setExchange(""); // set your queue name consumer.setQueueName("YOUR_QUEUE_NAME")



If its okay, could you please share code from your application.java and properties.xml here?

Thanks,
Vikram


On Thu, Jun 8, 2017 at 12:32 AM, <[hidden email]> wrote:

Thanks very much for the help. The only problem left is that I don't quite understand dag.addstream().

I tried this

RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);
dag.addStream("data", rabbitInput.output, out.input); 

but obviously this doesn't work. What I don't get is the difference between the ActiveMQ example and the RabbitMQ example. I looked over the test examples for RabbitMQ but don't seem to understand the logic behind it.

Is this the correct wax to specify properties:
  <property>
    <name>dt.operator.rabbitMQIn.prop.tuple_blast</name>
    <value>500</value>
  </property>

Cheers
Manfred.


Am 07.06.2017 um 12:03 schrieb Vikram Patil:
Yes, you would need Application.java which will be way to define a DAG
for Apex Application.

Please have look at the code from following example to find out how to
write JMS ActiveMQ based example:

https://github.com/DataTorrent/examples/tree/master/tutorials/jmsActiveMQ/src/main/java/com/example/jmsActiveMQ

This is how you can instantiate RabbitMQINputOperator and to a dag.
RabbitMQInputOperator consumer = dag.addOperator("Consumer",
RabbitMQInputOperator.class);

https://stackoverflow.com/questions/42207495/how-to-use-apache-apex-malhar-rabbitmq-operator-in-dag

Following properties need to be specified in properties.xml

* Properties:<br>
* <b>tuple_blast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <b>host</b>:the address for the consumer to connect to rabbitMQ producer<br>
* <b>exchange</b>:the exchange for the consumer to connect to rabbitMQ
producer<br>
* <b>exchangeType</b>:the exchangeType for the consumer to connect to
rabbitMQ producer<br>
* <b>routingKey</b>:the routingKey for the consumer to connect to
rabbitMQ producer<br>
* <b>queueName</b>:the queueName for the consumer to connect to
rabbitMQ producer<br>
* <br>

Reference: https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java

Thanks,
Vikram

On Wed, Jun 7, 2017 at 3:19 PM,  [hidden email] wrote:
Hello,

I compiled the whole thing but now I don't know exactly how to get it
running in Apex. Do I need an application.java like in the tutorial? I do
have a simple RabbitMQ queue up and running on the server. How do I consume
the messages with Apex and write them to hdfs?

Cheers,

Manfred

Following steps were necessary to get the RabbitMq test to compile

@TimeoutException
import java.util.concurrent.TimeoutException;
public void setup() throws IOException,TimeoutException
public void teardown() throws IOException,TimeoutException
protected void runTest(final int testNum) throws IOException

@Build jars
cd apex-malhar/contrib/
mvn clean package -DskipTests

cd apex-malhar/library/
mvn clean package -DskipTests
copy packages to project directory

@Link them to the project
Add following lines to the pom.xml
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>lib</groupId>
    <artifactId>com.datatorrent.lib.helper</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-library-3.8.0-SNAPSHOT-tests.jar</systemPath>
</dependency>
<dependency>
    <groupId>contrib</groupId>
    <artifactId>com.datatorrent.contrib.rabbitmq</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/malhar-contrib-3.8.0-SNAPSHOT.jar</systemPath>
</dependency>
<dependency>
    <groupId>Attribute</groupId>
    <artifactId>com.datatorrent.api.Attribute.AttributeMap</artifactId>
    <version>1.0</version>
    <scope>system</scope>

<systemPath>${project.basedir}/src/main/resources/apex-api-3.7.0-SNAPSHOT.jar</systemPath>
</dependency>


Am 31.05.2017 um 18:57 schrieb Sanjay Pujare:

Both com.datatorrent.contrib.helper and  com.datatorrent.lib.helper are
under the test directory under malhar-contrib and malhar-library
respectively. You may need to build these jars yourself with test scope to
include these packages.

On Wed, May 31, 2017 at 9:39 AM, [hidden email] wrote:
Hello, (mea culpa for messing up the headline the first time)

I'm currently trying to get the apex-malhar rabbitmq running. But I'm at a
complete loss, while the examples are running fine I don't even get the
RabbitMQInputOperatorTest.java to run.

First it couldn't find the rabbitmq-client which was solveable by adding
the dependency:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

But now it doesn't find the packages com.datatorrent.contrib.helper and
com.datatorrent.lib.helper and can't find several symbols.

Needless to say that I'm a beginner regarding Apex so does anyone know
what exactly I'm doing wrong here?

Cheers

Manfred.