How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters

apex

I have a problem getting the connection working with RabbitMQ:

I host the RabbitMQ on the same server the apex application is running.

+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
| apex               | fanout  |
+--------------------+---------+

+------+----------+
| name | messages |
+------+----------+
| task | 31       |
+------+----------+

In the program for test issues I declare it this way:

 @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("task");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}

But a look at the operators shows that it does not fetch any messages:

 {
    "id": "1",
    "name": "rabbitInput",
    "className": "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
    "container": null,
    "host": null,
    "totalTuplesProcessed": "0",
    "totalTuplesEmitted": "0",
    "tuplesProcessedPSMA": "0",
    "tuplesEmittedPSMA": "0",
    "cpuPercentageMA": "0.0",
    "latencyMA": "0",
    "status": "PENDING_DEPLOY",
    "lastHeartbeat": "0",
    "failureCount": "0",
    "recoveryWindowId": "0",
    "currentWindowId": "0",
    "ports": [],
    "unifierClass": null,
    "logicalName": "rabbitInput",
    "recordingId": null,
    "counters": null,
    "metrics": null,
    "checkpointStartTime": "0",
    "checkpointTime": "0",
    "checkpointTimeMA": "0"
  },

What am I doing wrong here? Since i can configure the RAbbitMQ side is there a preferred way of configuration for apex?

Cheers

Manfred.


Reply | Threaded
Open this post in threaded view
|

Re: How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters

vikram patil-2
Hi Manfred,

Are you getting any exception in the logs ?  Check if your queue is durable.  

Thanks & Regards,
Vikram

On Mon, Jun 26, 2017 at 8:37 PM, <[hidden email]> wrote:

I have a problem getting the connection working with RabbitMQ:

I host the RabbitMQ on the same server the apex application is running.

+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
| apex               | fanout  |
+--------------------+---------+

+------+----------+
| name | messages |
+------+----------+
| task | 31       |
+------+----------+

In the program for test issues I declare it this way:

 @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("task");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}

But a look at the operators shows that it does not fetch any messages:

 {
    "id": "1",
    "name": "rabbitInput",
    "className": "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
    "container": null,
    "host": null,
    "totalTuplesProcessed": "0",
    "totalTuplesEmitted": "0",
    "tuplesProcessedPSMA": "0",
    "tuplesEmittedPSMA": "0",
    "cpuPercentageMA": "0.0",
    "latencyMA": "0",
    "status": "PENDING_DEPLOY",
    "lastHeartbeat": "0",
    "failureCount": "0",
    "recoveryWindowId": "0",
    "currentWindowId": "0",
    "ports": [],
    "unifierClass": null,
    "logicalName": "rabbitInput",
    "recordingId": null,
    "counters": null,
    "metrics": null,
    "checkpointStartTime": "0",
    "checkpointTime": "0",
    "checkpointTimeMA": "0"
  },

What am I doing wrong here? Since i can configure the RAbbitMQ side is there a preferred way of configuration for apex?

Cheers

Manfred.



Reply | Threaded
Open this post in threaded view
|

Re: How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters

apex

I get no exception in the apex.log and yes the queue is durable.

                                    vhost: /
                                     name: task
                              auto_delete: False
 backing_queue_status.avg_ack_egress_rate: 0.0
backing_queue_status.avg_ack_ingress_rate: 0.0
     backing_queue_status.avg_egress_rate: 0.0
    backing_queue_status.avg_ingress_rate: 0.5866956420847993
               backing_queue_status.delta: ["delta", "undefined", 0, 0, "undefined"]
                 backing_queue_status.len: 31
                backing_queue_status.mode: default
         backing_queue_status.next_seq_id: 31
                  backing_queue_status.q1: 0
                  backing_queue_status.q2: 0
                  backing_queue_status.q3: 0
                  backing_queue_status.q4: 31
    backing_queue_status.target_ram_count: infinity
                     consumer_utilisation: None
                                consumers: 0
                                  durable: True
                                exclusive: False

The goal here is to connect to the RabbitMQ and fetch messages and write them to the console. I send the messages via a script or directly via the rabbitmqadmin console. Any Ideas why the program does not read from the rabbitmq?

Cheers Manfred.



Am 26.06.2017 um 17:14 schrieb vikram patil:
Hi Manfred,

Are you getting any exception in the logs ?  Check if your queue is durable.  

Thanks & Regards,
Vikram

On Mon, Jun 26, 2017 at 8:37 PM, <[hidden email]> wrote:

I have a problem getting the connection working with RabbitMQ:

I host the RabbitMQ on the same server the apex application is running.

+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
| apex               | fanout  |
+--------------------+---------+

+------+----------+
| name | messages |
+------+----------+
| task | 31       |
+------+----------+

In the program for test issues I declare it this way:

 @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("task");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}

But a look at the operators shows that it does not fetch any messages:

 {
    "id": "1",
    "name": "rabbitInput",
    "className": "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
    "container": null,
    "host": null,
    "totalTuplesProcessed": "0",
    "totalTuplesEmitted": "0",
    "tuplesProcessedPSMA": "0",
    "tuplesEmittedPSMA": "0",
    "cpuPercentageMA": "0.0",
    "latencyMA": "0",
    "status": "PENDING_DEPLOY",
    "lastHeartbeat": "0",
    "failureCount": "0",
    "recoveryWindowId": "0",
    "currentWindowId": "0",
    "ports": [],
    "unifierClass": null,
    "logicalName": "rabbitInput",
    "recordingId": null,
    "counters": null,
    "metrics": null,
    "checkpointStartTime": "0",
    "checkpointTime": "0",
    "checkpointTimeMA": "0"
  },

What am I doing wrong here? Since i can configure the RAbbitMQ side is there a preferred way of configuration for apex?

Cheers

Manfred.




Reply | Threaded
Open this post in threaded view
|

Re: How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters

vikram patil-2
If you have used any routing_key , please specify that using in.setRoutingKey() . I dont see that one in your code.

On Mon, Jun 26, 2017 at 9:00 PM, <[hidden email]> wrote:

I get no exception in the apex.log and yes the queue is durable.

                                    vhost: /
                                     name: task
                              auto_delete: False
 backing_queue_status.avg_ack_egress_rate: 0.0
backing_queue_status.avg_ack_ingress_rate: 0.0
     backing_queue_status.avg_egress_rate: 0.0
    backing_queue_status.avg_ingress_rate: 0.5866956420847993
               backing_queue_status.delta: ["delta", "undefined", 0, 0, "undefined"]
                 backing_queue_status.len: 31
                backing_queue_status.mode: default
         backing_queue_status.next_seq_id: 31
                  backing_queue_status.q1: 0
                  backing_queue_status.q2: 0
                  backing_queue_status.q3: 0
                  backing_queue_status.q4: 31
    backing_queue_status.target_ram_count: infinity
                     consumer_utilisation: None
                                consumers: 0
                                  durable: True
                                exclusive: False

The goal here is to connect to the RabbitMQ and fetch messages and write them to the console. I send the messages via a script or directly via the rabbitmqadmin console. Any Ideas why the program does not read from the rabbitmq?

Cheers Manfred.



Am 26.06.2017 um 17:14 schrieb vikram patil:
Hi Manfred,

Are you getting any exception in the logs ?  Check if your queue is durable.  

Thanks & Regards,
Vikram

On Mon, Jun 26, 2017 at 8:37 PM, <[hidden email]> wrote:

I have a problem getting the connection working with RabbitMQ:

I host the RabbitMQ on the same server the apex application is running.

+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
| apex               | fanout  |
+--------------------+---------+

+------+----------+
| name | messages |
+------+----------+
| task | 31       |
+------+----------+

In the program for test issues I declare it this way:

 @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("task");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}

But a look at the operators shows that it does not fetch any messages:

 {
    "id": "1",
    "name": "rabbitInput",
    "className": "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
    "container": null,
    "host": null,
    "totalTuplesProcessed": "0",
    "totalTuplesEmitted": "0",
    "tuplesProcessedPSMA": "0",
    "tuplesEmittedPSMA": "0",
    "cpuPercentageMA": "0.0",
    "latencyMA": "0",
    "status": "PENDING_DEPLOY",
    "lastHeartbeat": "0",
    "failureCount": "0",
    "recoveryWindowId": "0",
    "currentWindowId": "0",
    "ports": [],
    "unifierClass": null,
    "logicalName": "rabbitInput",
    "recordingId": null,
    "counters": null,
    "metrics": null,
    "checkpointStartTime": "0",
    "checkpointTime": "0",
    "checkpointTimeMA": "0"
  },

What am I doing wrong here? Since i can configure the RAbbitMQ side is there a preferred way of configuration for apex?

Cheers

Manfred.





Reply | Threaded
Open this post in threaded view
|

Re: How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters

apex

I tried it with and without routing key and it won't work either way.

@Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("task");
    in.setRoutingKey("task");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}


Am 26.06.2017 um 17:34 schrieb vikram patil:
If you have used any routing_key , please specify that using in.setRoutingKey() . I dont see that one in your code.

On Mon, Jun 26, 2017 at 9:00 PM, <[hidden email]> wrote:

I get no exception in the apex.log and yes the queue is durable.

                                    vhost: /
                                     name: task
                              auto_delete: False
 backing_queue_status.avg_ack_egress_rate: 0.0
backing_queue_status.avg_ack_ingress_rate: 0.0
     backing_queue_status.avg_egress_rate: 0.0
    backing_queue_status.avg_ingress_rate: 0.5866956420847993
               backing_queue_status.delta: ["delta", "undefined", 0, 0, "undefined"]
                 backing_queue_status.len: 31
                backing_queue_status.mode: default
         backing_queue_status.next_seq_id: 31
                  backing_queue_status.q1: 0
                  backing_queue_status.q2: 0
                  backing_queue_status.q3: 0
                  backing_queue_status.q4: 31
    backing_queue_status.target_ram_count: infinity
                     consumer_utilisation: None
                                consumers: 0
                                  durable: True
                                exclusive: False

The goal here is to connect to the RabbitMQ and fetch messages and write them to the console. I send the messages via a script or directly via the rabbitmqadmin console. Any Ideas why the program does not read from the rabbitmq?

Cheers Manfred.



Am 26.06.2017 um 17:14 schrieb vikram patil:
Hi Manfred,

Are you getting any exception in the logs ?  Check if your queue is durable.  

Thanks & Regards,
Vikram

On Mon, Jun 26, 2017 at 8:37 PM, <[hidden email]> wrote:

I have a problem getting the connection working with RabbitMQ:

I host the RabbitMQ on the same server the apex application is running.

+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
| apex               | fanout  |
+--------------------+---------+

+------+----------+
| name | messages |
+------+----------+
| task | 31       |
+------+----------+

In the program for test issues I declare it this way:

 @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("task");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}

But a look at the operators shows that it does not fetch any messages:

 {
    "id": "1",
    "name": "rabbitInput",
    "className": "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
    "container": null,
    "host": null,
    "totalTuplesProcessed": "0",
    "totalTuplesEmitted": "0",
    "tuplesProcessedPSMA": "0",
    "tuplesEmittedPSMA": "0",
    "cpuPercentageMA": "0.0",
    "latencyMA": "0",
    "status": "PENDING_DEPLOY",
    "lastHeartbeat": "0",
    "failureCount": "0",
    "recoveryWindowId": "0",
    "currentWindowId": "0",
    "ports": [],
    "unifierClass": null,
    "logicalName": "rabbitInput",
    "recordingId": null,
    "counters": null,
    "metrics": null,
    "checkpointStartTime": "0",
    "checkpointTime": "0",
    "checkpointTimeMA": "0"
  },

What am I doing wrong here? Since i can configure the RAbbitMQ side is there a preferred way of configuration for apex?

Cheers

Manfred.






Reply | Threaded
Open this post in threaded view
|

Re: How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters

apex
In reply to this post by vikram patil-2

The problem is that i don't see any connections in the rabbitMQ log. I don't even see any attempts to connect to the Server. Usually I should see at least some failed tries. I post the complete program perhaps i do have an error somewhere there.

package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;
import com.datatorrent.lib.io.ConsoleOutputOperator;


@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setRoutingKey("rktest");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}
}

I'm really at an end here. There are no errors in any log file but the stream is also not connecting for some reason I can't understand. This is the client library I'm using.
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>
 
Cheers
Manfred.

Am 26.06.2017 um 17:34 schrieb vikram patil:
If you have used any routing_key , please specify that using in.setRoutingKey() . I dont see that one in your code.

On Mon, Jun 26, 2017 at 9:00 PM, <[hidden email]> wrote:

I get no exception in the apex.log and yes the queue is durable.

                                    vhost: /
                                     name: task
                              auto_delete: False
 backing_queue_status.avg_ack_egress_rate: 0.0
backing_queue_status.avg_ack_ingress_rate: 0.0
     backing_queue_status.avg_egress_rate: 0.0
    backing_queue_status.avg_ingress_rate: 0.5866956420847993
               backing_queue_status.delta: ["delta", "undefined", 0, 0, "undefined"]
                 backing_queue_status.len: 31
                backing_queue_status.mode: default
         backing_queue_status.next_seq_id: 31
                  backing_queue_status.q1: 0
                  backing_queue_status.q2: 0
                  backing_queue_status.q3: 0
                  backing_queue_status.q4: 31
    backing_queue_status.target_ram_count: infinity
                     consumer_utilisation: None
                                consumers: 0
                                  durable: True
                                exclusive: False

The goal here is to connect to the RabbitMQ and fetch messages and write them to the console. I send the messages via a script or directly via the rabbitmqadmin console. Any Ideas why the program does not read from the rabbitmq?

Cheers Manfred.



Am 26.06.2017 um 17:14 schrieb vikram patil:
Hi Manfred,

Are you getting any exception in the logs ?  Check if your queue is durable.  

Thanks & Regards,
Vikram

On Mon, Jun 26, 2017 at 8:37 PM, <[hidden email]> wrote:

I have a problem getting the connection working with RabbitMQ:

I host the RabbitMQ on the same server the apex application is running.

+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
| apex               | fanout  |
+--------------------+---------+

+------+----------+
| name | messages |
+------+----------+
| task | 31       |
+------+----------+

In the program for test issues I declare it this way:

 @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("task");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}

But a look at the operators shows that it does not fetch any messages:

 {
    "id": "1",
    "name": "rabbitInput",
    "className": "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
    "container": null,
    "host": null,
    "totalTuplesProcessed": "0",
    "totalTuplesEmitted": "0",
    "tuplesProcessedPSMA": "0",
    "tuplesEmittedPSMA": "0",
    "cpuPercentageMA": "0.0",
    "latencyMA": "0",
    "status": "PENDING_DEPLOY",
    "lastHeartbeat": "0",
    "failureCount": "0",
    "recoveryWindowId": "0",
    "currentWindowId": "0",
    "ports": [],
    "unifierClass": null,
    "logicalName": "rabbitInput",
    "recordingId": null,
    "counters": null,
    "metrics": null,
    "checkpointStartTime": "0",
    "checkpointTime": "0",
    "checkpointTimeMA": "0"
  },

What am I doing wrong here? Since i can configure the RAbbitMQ side is there a preferred way of configuration for apex?

Cheers

Manfred.






Reply | Threaded
Open this post in threaded view
|

Re: How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters

apex

I tested a bit further and got this fascinating result:

  1. It works in local mode
  2. It does not work when deployed in the hadoop cluster (There are no errors in the yarn log files)

Any hints why it does not throw an error but does not work either? Are there other possible places then the yarn logs where i can dig for an error?



Am 27.06.2017 um 11:44 schrieb [hidden email]:

The problem is that i don't see any connections in the rabbitMQ log. I don't even see any attempts to connect to the Server. Usually I should see at least some failed tries. I post the complete program perhaps i do have an error somewhere there.

package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;
import com.datatorrent.lib.io.ConsoleOutputOperator;


@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setRoutingKey("rktest");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}
}

I'm really at an end here. There are no errors in any log file but the stream is also not connecting for some reason I can't understand. This is the client library I'm using.
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>
 
Cheers
Manfred.

Am 26.06.2017 um 17:34 schrieb vikram patil:
If you have used any routing_key , please specify that using in.setRoutingKey() . I dont see that one in your code.

On Mon, Jun 26, 2017 at 9:00 PM, <[hidden email]> wrote:

I get no exception in the apex.log and yes the queue is durable.

                                    vhost: /
                                     name: task
                              auto_delete: False
 backing_queue_status.avg_ack_egress_rate: 0.0
backing_queue_status.avg_ack_ingress_rate: 0.0
     backing_queue_status.avg_egress_rate: 0.0
    backing_queue_status.avg_ingress_rate: 0.5866956420847993
               backing_queue_status.delta: ["delta", "undefined", 0, 0, "undefined"]
                 backing_queue_status.len: 31
                backing_queue_status.mode: default
         backing_queue_status.next_seq_id: 31
                  backing_queue_status.q1: 0
                  backing_queue_status.q2: 0
                  backing_queue_status.q3: 0
                  backing_queue_status.q4: 31
    backing_queue_status.target_ram_count: infinity
                     consumer_utilisation: None
                                consumers: 0
                                  durable: True
                                exclusive: False

The goal here is to connect to the RabbitMQ and fetch messages and write them to the console. I send the messages via a script or directly via the rabbitmqadmin console. Any Ideas why the program does not read from the rabbitmq?

Cheers Manfred.



Am 26.06.2017 um 17:14 schrieb vikram patil:
Hi Manfred,

Are you getting any exception in the logs ?  Check if your queue is durable.  

Thanks & Regards,
Vikram

On Mon, Jun 26, 2017 at 8:37 PM, <[hidden email]> wrote:

I have a problem getting the connection working with RabbitMQ:

I host the RabbitMQ on the same server the apex application is running.

+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
| apex               | fanout  |
+--------------------+---------+

+------+----------+
| name | messages |
+------+----------+
| task | 31       |
+------+----------+

In the program for test issues I declare it this way:

 @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("task");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}

But a look at the operators shows that it does not fetch any messages:

 {
    "id": "1",
    "name": "rabbitInput",
    "className": "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
    "container": null,
    "host": null,
    "totalTuplesProcessed": "0",
    "totalTuplesEmitted": "0",
    "tuplesProcessedPSMA": "0",
    "tuplesEmittedPSMA": "0",
    "cpuPercentageMA": "0.0",
    "latencyMA": "0",
    "status": "PENDING_DEPLOY",
    "lastHeartbeat": "0",
    "failureCount": "0",
    "recoveryWindowId": "0",
    "currentWindowId": "0",
    "ports": [],
    "unifierClass": null,
    "logicalName": "rabbitInput",
    "recordingId": null,
    "counters": null,
    "metrics": null,
    "checkpointStartTime": "0",
    "checkpointTime": "0",
    "checkpointTimeMA": "0"
  },

What am I doing wrong here? Since i can configure the RAbbitMQ side is there a preferred way of configuration for apex?

Cheers

Manfred.







Reply | Threaded
Open this post in threaded view
|

Re: How to set RabbitMQ exchangeType, exchange, queueName, routingKey Parameters

apex

After various tests I finally got it all working nicely and for future users I'll post here how.

First the rabbitMQ configuration that was the only working one:
rabbitmqadmin declare exchange name=apex type=fanout durable=false
rabbitmqadmin declare queue name=test durable=true
rabbitmqadmin binding source="apex" destination_type="queue" destination="test" routing_key="rktest"


It is important that apex only accepts a non-durable exchange. But this means you have to recreate it everytime you restart your RabbitMQ service.

The "Mkdirs failed to create" error:
This just means that the DFS service is down or in my case the safemode is on.
hdfs dfsadmin -safemode get
hdfs dfsadmin -safemode enter


My example uses the following (I moved the operator values in a corresponding *.xml file I just listed them here for better understanding):

@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("192.168.33.63");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("test");

   LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator());
   out.setFilePath("/hdfs/rabbitMQ");
   out.setBaseName("rabbitOut");
   out.setMaxLength(1024);
   out.setRotationWindows(4);
   dag.addStream("data", in.outputPort, out.input);
}
}


And the corresponding Output Operator. The only important thing here was that it extends the byte AbstractFileOutputOperator

public class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
  private static final String NL = System.lineSeparator();
  private static final Charset CS = StandardCharsets.UTF_8;

  @NotNull
  private String baseName;

  @Override
  public byte[] getBytesForTuple(byte[] t) {
    String result = new String(t, CS) + NL;
    return result.getBytes(CS);
 }

  @Override
  protected String getFileName(byte[] tuple) {
    return baseName;
  }

  public String getBaseName() { return baseName; }
  public void setBaseName(String v) { baseName = v; }
}

The most pressing issue was that it won't run on the yarn cluster only in local mode. I still have no idea why it diden't run but my best guess is that it was a bad idea in the beginning to try the apex app in a Rasperry Pi 3 cluster. I switched to a standard Arch Linux Server with 8GB RAM and without changing a thing in the application it worked perfectly.

Thanks for all the help!
Am 27.06.2017 um 17:49 schrieb [hidden email]:

I tested a bit further and got this fascinating result:

  1. It works in local mode
  2. It does not work when deployed in the hadoop cluster (There are no errors in the yarn log files)

Any hints why it does not throw an error but does not work either? Are there other possible places then the yarn logs where i can dig for an error?



Am 27.06.2017 um 11:44 schrieb [hidden email]:

The problem is that i don't see any connections in the rabbitMQ log. I don't even see any attempts to connect to the Server. Usually I should see at least some failed tries. I post the complete program perhaps i do have an error somewhere there.

package com.example.rabbitMQ;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.jms.JMSStringInputOperator;
import com.datatorrent.lib.io.ConsoleOutputOperator;


@ApplicationAnnotation(name="RabbitMQ2HDFS")
public class RabbitMQApplication implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setRoutingKey("rktest");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}
}

I'm really at an end here. There are no errors in any log file but the stream is also not connecting for some reason I can't understand. This is the client library I'm using.
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>
 
Cheers
Manfred.

Am 26.06.2017 um 17:34 schrieb vikram patil:
If you have used any routing_key , please specify that using in.setRoutingKey() . I dont see that one in your code.

On Mon, Jun 26, 2017 at 9:00 PM, <[hidden email]> wrote:

I get no exception in the apex.log and yes the queue is durable.

                                    vhost: /
                                     name: task
                              auto_delete: False
 backing_queue_status.avg_ack_egress_rate: 0.0
backing_queue_status.avg_ack_ingress_rate: 0.0
     backing_queue_status.avg_egress_rate: 0.0
    backing_queue_status.avg_ingress_rate: 0.5866956420847993
               backing_queue_status.delta: ["delta", "undefined", 0, 0, "undefined"]
                 backing_queue_status.len: 31
                backing_queue_status.mode: default
         backing_queue_status.next_seq_id: 31
                  backing_queue_status.q1: 0
                  backing_queue_status.q2: 0
                  backing_queue_status.q3: 0
                  backing_queue_status.q4: 31
    backing_queue_status.target_ram_count: infinity
                     consumer_utilisation: None
                                consumers: 0
                                  durable: True
                                exclusive: False

The goal here is to connect to the RabbitMQ and fetch messages and write them to the console. I send the messages via a script or directly via the rabbitmqadmin console. Any Ideas why the program does not read from the rabbitmq?

Cheers Manfred.



Am 26.06.2017 um 17:14 schrieb vikram patil:
Hi Manfred,

Are you getting any exception in the logs ?  Check if your queue is durable.  

Thanks & Regards,
Vikram

On Mon, Jun 26, 2017 at 8:37 PM, <[hidden email]> wrote:

I have a problem getting the connection working with RabbitMQ:

I host the RabbitMQ on the same server the apex application is running.

+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
| apex               | fanout  |
+--------------------+---------+

+------+----------+
| name | messages |
+------+----------+
| task | 31       |
+------+----------+

In the program for test issues I declare it this way:

 @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("task");
    ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}

But a look at the operators shows that it does not fetch any messages:

 {
    "id": "1",
    "name": "rabbitInput",
    "className": "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
    "container": null,
    "host": null,
    "totalTuplesProcessed": "0",
    "totalTuplesEmitted": "0",
    "tuplesProcessedPSMA": "0",
    "tuplesEmittedPSMA": "0",
    "cpuPercentageMA": "0.0",
    "latencyMA": "0",
    "status": "PENDING_DEPLOY",
    "lastHeartbeat": "0",
    "failureCount": "0",
    "recoveryWindowId": "0",
    "currentWindowId": "0",
    "ports": [],
    "unifierClass": null,
    "logicalName": "rabbitInput",
    "recordingId": null,
    "counters": null,
    "metrics": null,
    "checkpointStartTime": "0",
    "checkpointTime": "0",
    "checkpointTimeMA": "0"
  },

What am I doing wrong here? Since i can configure the RAbbitMQ side is there a preferred way of configuration for apex?

Cheers

Manfred.