hdfs file write operator is increasing the latency - resulting entire DAG to fail

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

hdfs file write operator is increasing the latency - resulting entire DAG to fail

Raja.Aravapalli

Team,

 

We have an apex application that is reading from Kafka and wring to HDFS.

 

The  data flow for kafka topic is very huge… say 2500 messages per sec!!

 

The issue we are facing is:

 

The operator (which extends AbstractFileOutputOperator) is writing to hdfs is building latency over time and failing eventually. Can someone pls share your thoughts on how I can handle this ?

 

 

Thanks a lot.

 

 

Regards,

Raja.

Reply | Threaded
Open this post in threaded view
|

Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail

Pramod Immaneni
Hi Raja,

How many partitions do you have for the file output operator and what would you save your data write rate is in bytes/second.

Thanks

On Thu, Jul 13, 2017 at 8:13 AM, Raja.Aravapalli <[hidden email]> wrote:

Team,

 

We have an apex application that is reading from Kafka and wring to HDFS.

 

The  data flow for kafka topic is very huge… say 2500 messages per sec!!

 

The issue we are facing is:

 

The operator (which extends AbstractFileOutputOperator) is writing to hdfs is building latency over time and failing eventually. Can someone pls share your thoughts on how I can handle this ?

 

 

Thanks a lot.

 

 

Regards,

Raja.


Reply | Threaded
Open this post in threaded view
|

Re: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail

Raja.Aravapalli

 

Thanks for the response Pramod.

 

-          My hdfs operator is running in single partition. With the input of approx. 1000 msgs per sec. – I am not sure how to partition this operator

-          I am not really sure on how to check the bytes/sec. But, I hope It will be huge, because my msg size in kafka is approx. 2kb.   =è input 1000 msgs per sec * 2kb == approx.. 2mb per sec [Rough calculation]

-          And for your info, right now, using the below property I have the set the memory for this operator to 20Gb. Which I feel is very huge.
<property>
    <name>dt.operator.HDFS_operator.attr.MEMORY_MB</name>
    <value>20480</value>
</property>

 

Please advice.

 

 

Thanks a lot.

 

Raja.

 

From: Pramod Immaneni <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, July 13, 2017 at 10:31 AM
To: "[hidden email]" <[hidden email]>
Subject: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail

 

Hi Raja,

 

How many partitions do you have for the file output operator and what would you save your data write rate is in bytes/second.

 

Thanks

 

On Thu, Jul 13, 2017 at 8:13 AM, Raja.Aravapalli <[hidden email]> wrote:

Team,

 

We have an apex application that is reading from Kafka and wring to HDFS.

 

The  data flow for kafka topic is very huge… say 2500 messages per sec!!

 

The issue we are facing is:

 

The operator (which extends AbstractFileOutputOperator) is writing to hdfs is building latency over time and failing eventually. Can someone pls share your thoughts on how I can handle this ?

 

 

Thanks a lot.

 

 

Regards,

Raja.

 

Reply | Threaded
Open this post in threaded view
|

Re: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail

Pramod Immaneni
If the data can be written to different files then you can have multiple partitions, with different partitions writing to a disjointed set of files. You cannot have two partitions to writing to the same file.

As the file output operator has the ability for the implementation to supply a filename for every tuple, you could provide different filenames in the different partitions. To group data belonging to the same file to go to the same partition, you may need to specify a stream codec. Please see https://ci.apache.org/projects/apex-core/apex-core-javadoc-release-3.6/com/datatorrent/api/StreamCodec.html

To specify the number of partitions, for example as 4, you can use the following attribute

<property>
    <name>dt.operator.HDFS_operator.attr.PARTITIONER</name>
    <value>com.datatorrent.common.partitioner.StatelessPartitioner:4</value>
</property>

Second, the rate you mentioned 2mb/s isn't too high for a single partition so I am wondering if there is something else going on to increase latencies. In your implementation of the operator, are you doing any buffering or any heavy processing?

Thanks

On Thu, Jul 13, 2017 at 9:07 AM, Raja.Aravapalli <[hidden email]> wrote:

 

Thanks for the response Pramod.

 

-          My hdfs operator is running in single partition. With the input of approx. 1000 msgs per sec. – I am not sure how to partition this operator

-          I am not really sure on how to check the bytes/sec. But, I hope It will be huge, because my msg size in kafka is approx. 2kb.   =è input 1000 msgs per sec * 2kb == approx.. 2mb per sec [Rough calculation]

-          And for your info, right now, using the below property I have the set the memory for this operator to 20Gb. Which I feel is very huge.
<property>
    <name>dt.operator.HDFS_operator.attr.MEMORY_MB</name>
    <value>20480</value>
</property>

 

Please advice.

 

 

Thanks a lot.

 

Raja.

 

From: Pramod Immaneni <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, July 13, 2017 at 10:31 AM
To: "[hidden email]" <[hidden email]>
Subject: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail

 

Hi Raja,

 

How many partitions do you have for the file output operator and what would you save your data write rate is in bytes/second.

 

Thanks

 

On Thu, Jul 13, 2017 at 8:13 AM, Raja.Aravapalli <[hidden email]> wrote:

Team,

 

We have an apex application that is reading from Kafka and wring to HDFS.

 

The  data flow for kafka topic is very huge… say 2500 messages per sec!!

 

The issue we are facing is:

 

The operator (which extends AbstractFileOutputOperator) is writing to hdfs is building latency over time and failing eventually. Can someone pls share your thoughts on how I can handle this ?

 

 

Thanks a lot.

 

 

Regards,

Raja.

 


Reply | Threaded
Open this post in threaded view
|

Re: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail

Raja.Aravapalli

 

Some more information on the application:

 

 

Kafka input operator è Deseriazation of avro è Enrich the message with some text  è Unifier (auto-generated) è write to hdfs

 

Kafka input operator                                         ----> running in 10 instances             ---->        with setting ONE_TO_MANY

Deseriazation of avro                                        ----> (running in 10 instances with parallel parition)

Enrich the message with some text               ----> (running in 10 instances with parallel parition)

Unifier                                                                   ----> running in SINGLE instance -  accumulating all the messages from 10 partitions --- receiving approx. 1000 msgs per sec --- running with mem setting to 20gb

write to hdfs                                                        ----> running in SINGLE instance collecting all the messages from Unifier --- receiving approx. 1000 msgs per sec --- running with mem setting to 20gb

 

Please advice.

 

 

Regards,

Raja.

 

From: Pramod Immaneni <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, July 13, 2017 at 11:27 AM
To: "[hidden email]" <[hidden email]>
Subject: Re: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail

 

If the data can be written to different files then you can have multiple partitions, with different partitions writing to a disjointed set of files. You cannot have two partitions to writing to the same file.

 

As the file output operator has the ability for the implementation to supply a filename for every tuple, you could provide different filenames in the different partitions. To group data belonging to the same file to go to the same partition, you may need to specify a stream codec. Please see https://ci.apache.org/projects/apex-core/apex-core-javadoc-release-3.6/com/datatorrent/api/StreamCodec.html

 

To specify the number of partitions, for example as 4, you can use the following attribute

 

<property>
    <name>dt.operator.HDFS_operator.attr.PARTITIONER</name>
    <value>com.datatorrent.common.partitioner.StatelessPartitioner:4</value>
</property>

 

Second, the rate you mentioned 2mb/s isn't too high for a single partition so I am wondering if there is something else going on to increase latencies. In your implementation of the operator, are you doing any buffering or any heavy processing?

 

Thanks

 

On Thu, Jul 13, 2017 at 9:07 AM, Raja.Aravapalli <[hidden email]> wrote:

 

Thanks for the response Pramod.

 

-          My hdfs operator is running in single partition. With the input of approx. 1000 msgs per sec. – I am not sure how to partition this operator

-          I am not really sure on how to check the bytes/sec. But, I hope It will be huge, because my msg size in kafka is approx. 2kb.   =è input 1000 msgs per sec * 2kb == approx.. 2mb per sec [Rough calculation]

-          And for your info, right now, using the below property I have the set the memory for this operator to 20Gb. Which I feel is very huge.
<property>
    <name>dt.operator.HDFS_operator.attr.MEMORY_MB</name>
    <value>20480</value>
</property>

 

Please advice.

 

 

Thanks a lot.

 

Raja.

 

From: Pramod Immaneni <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, July 13, 2017 at 10:31 AM
To: "[hidden email]" <[hidden email]>
Subject: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail

 

Hi Raja,

 

How many partitions do you have for the file output operator and what would you save your data write rate is in bytes/second.

 

Thanks

 

On Thu, Jul 13, 2017 at 8:13 AM, Raja.Aravapalli <[hidden email]> wrote:

Team,

 

We have an apex application that is reading from Kafka and wring to HDFS.

 

The  data flow for kafka topic is very huge… say 2500 messages per sec!!

 

The issue we are facing is:

 

The operator (which extends AbstractFileOutputOperator) is writing to hdfs is building latency over time and failing eventually. Can someone pls share your thoughts on how I can handle this ?

 

 

Thanks a lot.

 

 

Regards,

Raja.