Need help on achieving end to end exactly once with KafkaIn and KafakOut

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Need help on achieving end to end exactly once with KafkaIn and KafakOut

Vivek Bhide
This is a continuation of my previous question about
KafkaSinglePortExactlyOnceOutputOperator. I am trying a to achieve end to
end exactly once processing with data receiving from one Kafka topic and
finally posting it to another Kafka topic. Below are three things needed, as
Pramod mentioned in one of the presentations, to achieve this

1. Idempotent inuput operator
2. stateful operator recovery (Something that Apex provides out of the box)
3. Action by OutputOperator (similar to what AbstractFileOutputOperator
does)

I am using the KafkaSinglePortInputOperator from
org.apache.apex.malhar.kafka package which is not an idempotent by default.
I made it idempotent as below

public abstract class CustomAbstractKafkaInputOperator extends
AbstractKafkaInputOperator {

        @Override
        public void setup(OperatorContext context) {
                super.setWindowDataManager(new FSWindowDataManager());
                super.setup(context);
        }

}

public class CustomKafkaSinglePortInputOperator extends
CustomAbstractKafkaInputOperator {

        public final transient DefaultOutputPort<byte[]> outputPort = new
DefaultOutputPort<>();

        @Override
        public AbstractKafkaConsumer createConsumer(Properties properties) {
                return new KafkaConsumer09(properties);
        }

        @Override
        protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]>
message) {
                outputPort.emit(message.value());
        }
}

For output operator I am using KafkaSinglePortExactlyOnceOutputOperator from
org.apache.apex.malhar.kafka package. With reference to my previous
question, I made sure that both/all applicable mentioned conditions are
satisfied and even after that, I am receiving below error

2018-03-15 12:28:49,485 INFO com.datatorrent.stram.StreamingContainerParent:
child msg: Stopped running due to an exception. java.lang.RuntimeException:
Violates Exactly once. Not all the tuples received after operator reset.
        at
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190)
        at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
        at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
        at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
 context:
PTContainer[id=4(container_1521139241132_0003_01_000013),state=ACTIVE,operators=[PTOperator[id=4,name=kafkaOutputOperator,state=PENDING_DEPLOY]]]

Questions I have are

1. Is the way input operator made idempotent correct? or am i missing
anything
2. Do I need to make each and every operator in pipeline idempotent to
achieve this? As per my understanding, Not; because once the mapping between
tuple to window is established at first operator, it doen't change anywhere
furhter in pipeline

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Pramod Immaneni
Have you tried instantiating FSWindowDataManger and setting it on the operator instance in populateDAG. There may be state associated with the window manager which will be lost if you set a new instance each time in setup. You should be able to use KakfaSinglePortInputOperator directly.

On Thu, Mar 15, 2018 at 2:49 PM, Vivek Bhide <[hidden email]> wrote:
This is a continuation of my previous question about
KafkaSinglePortExactlyOnceOutputOperator. I am trying a to achieve end to
end exactly once processing with data receiving from one Kafka topic and
finally posting it to another Kafka topic. Below are three things needed, as
Pramod mentioned in one of the presentations, to achieve this

1. Idempotent inuput operator
2. stateful operator recovery (Something that Apex provides out of the box)
3. Action by OutputOperator (similar to what AbstractFileOutputOperator
does)

I am using the KafkaSinglePortInputOperator from
org.apache.apex.malhar.kafka package which is not an idempotent by default.
I made it idempotent as below

public abstract class CustomAbstractKafkaInputOperator extends
AbstractKafkaInputOperator {

        @Override
        public void setup(OperatorContext context) {
                super.setWindowDataManager(new FSWindowDataManager());
                super.setup(context);
        }

}

public class CustomKafkaSinglePortInputOperator extends
CustomAbstractKafkaInputOperator {

        public final transient DefaultOutputPort<byte[]> outputPort = new
DefaultOutputPort<>();

        @Override
        public AbstractKafkaConsumer createConsumer(Properties properties) {
                return new KafkaConsumer09(properties);
        }

        @Override
        protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]>
message) {
                outputPort.emit(message.value());
        }
}

For output operator I am using KafkaSinglePortExactlyOnceOutputOperator from
org.apache.apex.malhar.kafka package. With reference to my previous
question, I made sure that both/all applicable mentioned conditions are
satisfied and even after that, I am receiving below error

2018-03-15 12:28:49,485 INFO com.datatorrent.stram.StreamingContainerParent:
child msg: Stopped running due to an exception. java.lang.RuntimeException:
Violates Exactly once. Not all the tuples received after operator reset.
        at
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190)
        at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
        at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
        at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
 context:
PTContainer[id=4(container_1521139241132_0003_01_000013),state=ACTIVE,operators=[PTOperator[id=4,name=kafkaOutputOperator,state=PENDING_DEPLOY]]]

Questions I have are

1. Is the way input operator made idempotent correct? or am i missing
anything
2. Do I need to make each and every operator in pipeline idempotent to
achieve this? As per my understanding, Not; because once the mapping between
tuple to window is established at first operator, it doen't change anywhere
furhter in pipeline

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Vivek Bhide
Hi Pramod,

I tried it but I am not getting consistent results. It worked few times but
then again failed with same error. Is there anything else that you will
recommend to validate?

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Vivek Bhide
In reply to this post by Pramod Immaneni
Hi Pramod,

We did some more research by adding more logging to the KafkaInput operator
and below are our findings.

Application Setup:
1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
2. Streaming window for application is set to 5 seconds from 0.5 seconds for
easily reproducing the issue
3. Created 2 custom classes by for Input and Output operator only for the
purpose of adding debugging logs

Logs for KafkaIn before operator failure :
--------------------------------------------
2018-03-20 19:36:49,494 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:36:49,599 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:36:54,496 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:36:54,578 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

Logs of the KafKaIn after recovery :
------------------------------------
CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:37:06,664 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:37:06,665 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:37:06,720 INFO  util.AsyncFSStorageAgent
(AsyncFSStorageAgent.java:save(91)) - using
/grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331
as the basepath for checkpointing.
2018-03-20 19:37:06,727 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771824 : 0
2018-03-20 19:37:06,768 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771825 : 0
2018-03-20 19:37:06,810 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

Logs of the KafkaOutput operator :
-----------------------------------

2018-03-20 19:37:06,616 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771822
2018-03-20 19:37:06,617 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203))
- Rebuild the partial window after 6535189514237771823
2018-03-20 19:37:07,943 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304))
- Partitial Window tuples :
{id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,944 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771823
2018-03-20 19:37:07,944 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771824
2018-03-20 19:37:07,945 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
(StreamingContainer.java:run(1456)) - Operator set
[OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
stopped running due to an exception.
java.lang.RuntimeException: Violates Exactly once. Not all the tuples
received after operator reset.
        at
com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117)
        at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
        at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
        at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
2018-03-20 19:37:07,964 INFO  producer.KafkaProducer
(KafkaProducer.java:close(613)) - Closing the Kafka producer with
timeoutMillis = 9223372036854775807 ms.
2018-03-20 19:37:08,515 INFO  engine.StreamingContainer
(StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy request:
[3]


If you see the logs from KafKa In before and after, the last window that
operator processed is 6535189514237771823 and while processing
6535189514237771824 it got killed. You can also see that the first tuple
from window 6535189514237771824 is {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}.
When Operator recovers it replays the tuple correctly till
6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
6535189514237771825 window ids and then send the complete accumulated tuples
in 6535189514237771826 with 1st tuple as
{"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

This as per my understanding is not an idempotent behavior since the tuple
assignment before failure changed after recovery. Please correct me if I am
wrong. This we believe is casuing the failure for output operator because we
see that it recovers correctly with partially processed window
6535189514237771824. (Please refer the logs). We also verfied it by adding
consumer on output topic

Could you please confirm if its an issue and needs fix? and suggest one if
possible?

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Vivek Bhide
Can someone please confirm our findings? It very critical for us to solve
this issue since the whole pipeline's functionality is at stake

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Pramod Immaneni
In reply to this post by Vivek Bhide
Any window that was not complete by the time the operator died is not replayed by definition (as we don't have all the data in the window) and the output operators should also not expect that. In your case if the operator died during window ..24 then on restart you can expect that the input operator with the data manager will replay all windows from checkpoint till and including the window prior to failure, in an idempotent fashion, but not the window during which failure happened. Also in idempotent replay, the window is treated as the replay unit, so the exact data within windows is replayed but order is not guaranteed generally because of partitioning the data can arrive in different order than the previous run at the output operators. Typically the output operators in the library that do exactly once do understand and work with these definitions, so not sure exactly why the kafka output operator is reporting exactly once violation for an incomplete window. Maybe somebody who is well versed with the output operator code can comment?

Thanks

On Tue, Mar 20, 2018 at 6:16 PM, Vivek Bhide <[hidden email]> wrote:
Hi Pramod,

We did some more research by adding more logging to the KafkaInput operator
and below are our findings.

Application Setup:
1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
2. Streaming window for application is set to 5 seconds from 0.5 seconds for
easily reproducing the issue
3. Created 2 custom classes by for Input and Output operator only for the
purpose of adding debugging logs

Logs for KafkaIn before operator failure :
--------------------------------------------
2018-03-20 19:36:49,494 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:36:49,599 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:36:54,496 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:36:54,578 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

Logs of the KafKaIn after recovery :
------------------------------------
CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:37:06,664 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:37:06,665 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:37:06,720 INFO  util.AsyncFSStorageAgent
(AsyncFSStorageAgent.java:save(91)) - using
/grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331
as the basepath for checkpointing.
2018-03-20 19:37:06,727 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771824 : 0
2018-03-20 19:37:06,768 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771825 : 0
2018-03-20 19:37:06,810 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

Logs of the KafkaOutput operator :
-----------------------------------

2018-03-20 19:37:06,616 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771822
2018-03-20 19:37:06,617 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203))
- Rebuild the partial window after 6535189514237771823
2018-03-20 19:37:07,943 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304))
- Partitial Window tuples :
{id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,944 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771823
2018-03-20 19:37:07,944 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771824
2018-03-20 19:37:07,945 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
(StreamingContainer.java:run(1456)) - Operator set
[OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
stopped running due to an exception.
java.lang.RuntimeException: Violates Exactly once. Not all the tuples
received after operator reset.
        at
com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117)
        at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
        at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
        at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
2018-03-20 19:37:07,964 INFO  producer.KafkaProducer
(KafkaProducer.java:close(613)) - Closing the Kafka producer with
timeoutMillis = 9223372036854775807 ms.
2018-03-20 19:37:08,515 INFO  engine.StreamingContainer
(StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy request:
[3]


If you see the logs from KafKa In before and after, the last window that
operator processed is 6535189514237771823 and while processing
6535189514237771824 it got killed. You can also see that the first tuple
from window 6535189514237771824 is {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}.
When Operator recovers it replays the tuple correctly till
6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
6535189514237771825 window ids and then send the complete accumulated tuples
in 6535189514237771826 with 1st tuple as
{"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

This as per my understanding is not an idempotent behavior since the tuple
assignment before failure changed after recovery. Please correct me if I am
wrong. This we believe is casuing the failure for output operator because we
see that it recovers correctly with partially processed window
6535189514237771824. (Please refer the logs). We also verfied it by adding
consumer on output topic

Could you please confirm if its an issue and needs fix? and suggest one if
possible?

Reply | Threaded
Open this post in threaded view
|

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Sandesh Hegde
Kafka exactly once output operator is assuming that partial window data will come in the same window after recovery. This is a bug which needs to be fixed. This doesn't affect Kafka 0.11, as a different mechanism is used to achieve exactly once.

On Wed, Mar 21, 2018 at 11:04 AM Pramod Immaneni <[hidden email]> wrote:
Any window that was not complete by the time the operator died is not replayed by definition (as we don't have all the data in the window) and the output operators should also not expect that. In your case if the operator died during window ..24 then on restart you can expect that the input operator with the data manager will replay all windows from checkpoint till and including the window prior to failure, in an idempotent fashion, but not the window during which failure happened. Also in idempotent replay, the window is treated as the replay unit, so the exact data within windows is replayed but order is not guaranteed generally because of partitioning the data can arrive in different order than the previous run at the output operators. Typically the output operators in the library that do exactly once do understand and work with these definitions, so not sure exactly why the kafka output operator is reporting exactly once violation for an incomplete window. Maybe somebody who is well versed with the output operator code can comment?

Thanks

On Tue, Mar 20, 2018 at 6:16 PM, Vivek Bhide <[hidden email]> wrote:
Hi Pramod,

We did some more research by adding more logging to the KafkaInput operator
and below are our findings.

Application Setup:
1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager
2. Streaming window for application is set to 5 seconds from 0.5 seconds for
easily reproducing the issue
3. Created 2 custom classes by for Input and Output operator only for the
purpose of adding debugging logs

Logs for KafkaIn before operator failure :
--------------------------------------------
2018-03-20 19:36:49,494 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:36:49,599 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:36:54,496 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:36:54,578 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

Logs of the KafKaIn after recovery :
------------------------------------
CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771822 : 48
2018-03-20 19:37:06,664 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"}
2018-03-20 19:37:06,665 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771823 : 48
2018-03-20 19:37:06,720 INFO  util.AsyncFSStorageAgent
(AsyncFSStorageAgent.java:save(91)) - using
/grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/application_1519410901484_172884/container_e3125_1519410901484_172884_01_000005/tmp/chkp4360474156134593331
as the basepath for checkpointing.
2018-03-20 19:37:06,727 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771824 : 0
2018-03-20 19:37:06,768 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples
processed in window 6535189514237771825 : 0
2018-03-20 19:37:06,810 INFO
kafkaoutputdedup.CustomKafkaSinglePortInputOperator
(CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in
window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

Logs of the KafkaOutput operator :
-----------------------------------

2018-03-20 19:37:06,616 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771822
2018-03-20 19:37:06,617 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(203))
- Rebuild the partial window after 6535189514237771823
2018-03-20 19:37:07,943 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:rebuildPartialWindow(304))
- Partitial Window tuples :
{id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147,
name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,944 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771823
2018-03-20 19:37:07,944 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) -
Current Window : 6535189514237771824
2018-03-20 19:37:07,945 INFO
kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator
(CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) -
Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM,
randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148,
name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149,
name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146,
name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1}
2018-03-20 19:37:07,946 ERROR engine.StreamingContainer
(StreamingContainer.java:run(1456)) - Operator set
[OperatorDeployInfo[id=3,name=kafkaOutputOperator,type=GENERIC,checkpoint={5ab1a83d00000029,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=output,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
stopped running due to an exception.
java.lang.RuntimeException: Violates Exactly once. Not all the tuples
received after operator reset.
        at
com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOnceOutputOperator.java:117)
        at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153)
        at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397)
        at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428)
2018-03-20 19:37:07,964 INFO  producer.KafkaProducer
(KafkaProducer.java:close(613)) - Closing the Kafka producer with
timeoutMillis = 9223372036854775807 ms.
2018-03-20 19:37:08,515 INFO  engine.StreamingContainer
(StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy request:
[3]


If you see the logs from KafKa In before and after, the last window that
operator processed is 6535189514237771823 and while processing
6535189514237771824 it got killed. You can also see that the first tuple
from window 6535189514237771824 is {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}.
When Operator recovers it replays the tuple correctly till
6535189514237771823 but then it send 0 tuples for 6535189514237771824 and
6535189514237771825 window ids and then send the complete accumulated tuples
in 6535189514237771826 with 1st tuple as
{"id":145,"name":"GTNQLMEVGRWRHZQANCVM"}

This as per my understanding is not an idempotent behavior since the tuple
assignment before failure changed after recovery. Please correct me if I am
wrong. This we believe is casuing the failure for output operator because we
see that it recovers correctly with partially processed window
6535189514237771824. (Please refer the logs). We also verfied it by adding
consumer on output topic

Could you please confirm if its an issue and needs fix? and suggest one if
possible?

Reply | Threaded
Open this post in threaded view
|

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Vivek Bhide
Thanks Sandesh for confirmation.. Can you point us to updated version of this
Output operator?

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Need help on achieving end to end exactly once with KafkaIn and KafakOut

Sandesh Hegde
If you are looking for the output operator that supports Kafka 0.11 then it is implemented by DataTorrent, will be contributed back to the community in the coming weeks. As far as the bug in the earlier versions of the operator is concerned it is an open item, which needs to be picked up by the community.

On Wed, Mar 21, 2018 at 4:39 PM Vivek Bhide <[hidden email]> wrote:
Thanks Sandesh for confirmation.. Can you point us to updated version of this
Output operator?

Regards
Vivek



--
Sent from: http://apache-apex-users-list.78494.x6.nabble.com/