How to consume from two different topics in one apex application

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to consume from two different topics in one apex application

rishi
Hi,

I want to consume messages from two different topic(one topic on 0.8 and another one on 0.9 ssl enabled) in one single apex application, rather then running two different application for two different topics.

Right now I am able to consume both topics separately . For 0.8 I am using(KafkaSinglePortByteArrayInputOperator) and for .9 I am using (KafkaSinglePortInputOperator) classes.

Thanks
Rishi
Reply | Threaded
Open this post in threaded view
|

Re: How to consume from two different topics in one apex application

Sunil Parmar-2
You can have two separate input operator in the same app. But you have to make sure the next operator in the dag who is taking input from these two different input operator has two input ports.

Sunil Parmar

On Wed, Jul 26, 2017 at 1:18 AM, rishi <[hidden email]> wrote:
Hi,

I want to consume messages from two different topic(one topic on 0.8 and
another one on 0.9 ssl enabled) in one single apex application, rather then
running two different application for two different topics.

Right now I am able to consume both topics separately . For 0.8 I am
using(KafkaSinglePortByteArrayInputOperator) and for .9 I am using
(KafkaSinglePortInputOperator) classes.

Thanks
Rishi



--
View this message in context: http://apache-apex-users-list.78494.x6.nabble.com/How-to-consume-from-two-different-topics-in-one-apex-application-tp1797.html
Sent from the Apache Apex Users list mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: How to consume from two different topics in one apex application

rishi
Hi, Thanks for the reply.

I tried to consume from two different topics in same app , I am getting error (java.lang.NoSuchMethodError: kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;) .

When I tried consuming from kafka 9 using this(KafkaSinglePortInputOperator) operator, I was able to do it successfully , but when I am adding another one more operator(KafkaSinglePortByteArrayInputOperator) to consume from .8  in same dag I am getting the error.

For testing I am not merging kafka output to any operator, it is writing at two different location in HDFS.

Looks like there is some version issue comming , which I am not able to identify . Any help is highly appreciated.

My pom.xml looks like this=

<properties>
   
    <apex.version>3.4.0</apex.version>
    <malhar.version>3.6.0</malhar.version>
    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
    <hadoop.version>2.7.1.2.3.4.0-3485</hadoop.version>
        <hbase.version>1.1.2.2.3.4.0-3485</hbase.version>
        <kafka.version>0.9.0.1</kafka.version>
         <confluent.kafka.version>0.9.0.1-cp1</confluent.kafka.version>
        <kafka.avro.srlzr.version>2.0.1</kafka.avro.srlzr.version>
     
        <avro.version>1.7.7</avro.version>
        <json.version>1.1</json.version>
        <jodatime.version>2.9.1</jodatime.version>
        <kyroserializer.version>0.38</kyroserializer.version>
        <junit.version>4.10</junit.version>
  </properties>
 
  <repositories>
        <repository>
            <id>HDPReleases</id>
            <name>HDP Releases</name>
            <url>http://repo.hortonworks.com/content/repositories/releases/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>HDP Jetty Hadoop</id>
            <name>HDP Jetty Hadoop</name>
            <url>http://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven</url>
        </repository>
    </repositories>
        <dependencies>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-library</artifactId>
            <version>${malhar.version}</version>
           
           
        </dependency>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>apex-common</artifactId>
            <version>${apex.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>apex-engine</artifactId>
            <version>${apex.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-contrib</artifactId>
            <version>${malhar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-kafka</artifactId>
            <version>${malhar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>

         <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${confluent.kafka.version}</version>
        </dependency>
          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${kafka.avro.srlzr.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>${json.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>${jodatime.version}</version>
        </dependency>

        <dependency>
            <groupId>de.javakaffee</groupId>
            <artifactId>kryo-serializers</artifactId>
            <version>${kyroserializer.version}</version>
        </dependency>
    </dependencies>


My DAG looks like this=>

public void populateDAG(DAG dag, Configuration conf)
  {

    KafkaSinglePortInputOperator kafkaInTtce = dag.addOperator("Kafka_Input_SSL",new KafkaSinglePortInputOperator());
    kafkaInTtce.setInitialPartitionCount(Integer.parseInt(conf.get("kafka.partitioncount")));
    kafkaInTtce.setTopics(conf.get("kafka.ssl.topic"));
    kafkaInTtce.setInitialOffset(conf.get("kafka.offset"));
    kafkaInTtce.setClusters(conf.get("kafka.cluster"));
    kafkaInTtce.setConsumerProps(getKafkaProperties(conf.get("kafka.cluster"), conf));
    kafkaInTtce.setStrategy(conf.get("kafka.strategy"));

    AvroBytesConversionOperator avroConversion = dag.addOperator("Avro_Convert", new AvroBytesConversionOperator(conf.get("kafka.schema.registry")));
    ColumnsExtractOperator fieldExtract = dag.addOperator("Field_Extract", new ColumnsExtractOperator());

    WriteToHdfs hdfs = dag.addOperator("To_HDFS", new WriteToHdfs(conf.get("hdfs.filename")));
    hdfs.setMaxLength(268435456); // new file rotates after every 256mb
   
    dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInTtce.outputPort, avroConversion.input);
    dag.addStream("jsonstring_stream", avroConversion.output, fieldExtract.input);
    dag.addStream("valid_recs_into_hdfs_stream", fieldExtract.output, hdfs.input);
   
    KafkaSinglePortByteArrayInputOperator kafkaInput =  dag.addOperator("Kafka_Input_NonSSL", new KafkaSinglePortByteArrayInputOperator());
    CopyofAvroBytesConversionOperator avroConversionEstore = dag.addOperator("Avro_Convert_estore", new CopyofAvroBytesConversionOperator("http://--------"));
    CopyOfColumnsExtractOperator fieldExtractEstore = dag.addOperator("Field_Extract_Estore", new CopyOfColumnsExtractOperator());
    WriteToHdfs2 hdfs2 = dag.addOperator("To_HDFS2", new WriteToHdfs2("DeviceTypeEstore"));
    hdfs2.setMaxLength(268435456);
   
    dag.addStream("Kafka_Avro_estore_Stream", kafkaInput.outputPort, avroConversionEstore.input);
    dag.addStream("jsonstring_stream_estore", avroConversionEstore.output, fieldExtractEstore.input);
    dag.addStream("valid_recs_into_hdfs_estorestream", fieldExtractEstore.output, hdfs2.input);
  }


Error I am getting(.dt log)=>

2017-08-01 04:57:38,281 INFO  stram.StreamingAppMaster (StreamingAppMaster.java:main(99)) - Initializing Application Master.
2017-08-01 04:57:38,388 INFO  stram.StreamingAppMasterService (StreamingAppMasterService.java:serviceInit(537)) - Application master, appId=507386, clustertimestamp=1500406884031, attemptId=2
2017-08-01 04:57:38,622 WARN  util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2017-08-01 04:57:39,441 WARN  shortcircuit.DomainSocketFactory (DomainSocketFactory.java:<init>(117)) - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2017-08-01 04:57:40,088 INFO  kafka.AbstractKafkaInputOperator (AbstractKafkaInputOperator.java:initPartitioner(327)) - Initialize Partitioner
2017-08-01 04:57:40,089 INFO  kafka.AbstractKafkaInputOperator (AbstractKafkaInputOperator.java:initPartitioner(340)) - Actual Partitioner is class org.apache.apex.malhar.kafka.OneToManyPartitioner
2017-08-01 04:57:40,121 INFO  kafka.AbstractKafkaPartitioner (AbstractKafkaPartitioner.java:initMetadataClients(234)) - Consumer Properties :  #
#Tue Aug 01 04:57:40 CDT 2017
security.protocol=SSL
enable.auto.commit=false
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id=org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP
ssl.keystore.password=
ssl.truststore.location=/home_dir/client.truststore.jks
bootstrap.servers=
ssl.truststore.password=
ssl.keystore.location=/home_dir/server.keystore.jks
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
 
2017-08-01 04:57:40,290 INFO  utils.AppInfoParser (AppInfoParser.java:<init>(82)) - Kafka version : 0.9.0.0
2017-08-01 04:57:40,291 INFO  utils.AppInfoParser (AppInfoParser.java:<init>(83)) - Kafka commitId : fc7243c2af4b2b4a
2017-08-01 04:57:41,306 INFO  kafka.AbstractKafkaPartitioner (AbstractKafkaPartitioner.java:definePartitions(151)) - Partition change detected:
2017-08-01 04:57:41,307 INFO  kafka.AbstractKafkaPartitioner (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 0 with assignment PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-21};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-22};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-23};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-24};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-18};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-19};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-20};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-14};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-25};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-26}
2017-08-01 04:57:41,318 INFO  kafka.AbstractKafkaPartitioner (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 1 with assignment PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-6};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-8};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-2};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-3};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-13};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-16};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-9};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-10};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-11};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-28}
2017-08-01 04:57:41,322 INFO  kafka.AbstractKafkaPartitioner (AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 2 with assignment PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-5};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-7};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-17};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-1};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-4};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-29};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-15};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-0};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-27};PartitionMeta{cluster='10.66.137.93:9093', topicPartition=firefly-apps-superapp-12}
2017-08-01 04:57:41,365 INFO  zkclient.ZkEventThread (ZkEventThread.java:run(64)) - Starting ZkClient event thread.
2017-08-01 04:57:41,372 INFO  zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-08-01 04:57:41,372 INFO  zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:host.name=brdn1351.target.com
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.version=1.8.0_73
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle Corporation
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper (Environment.java:logEnv(100)) - Client environment:java.home=/usr/java/jdk1.8.0_73/jre
connectString=10.66.137.94:2181,10.66.137.95:2181,10.66.137.96:2181,10.66.137.97:2181,10.66.137.98:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@6d64b553
2017-08-01 04:57:41,388 INFO  zkclient.ZkClient (ZkClient.java:waitForKeeperState(934)) - Waiting for keeper state SyncConnected
2017-08-01 04:57:41,392 INFO  zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(975)) - Opening socket connection to server 10.66.137.97/10.66.137.97:2181. Will not attempt to authenticate using SASL (unknown error)
2017-08-01 04:57:41,393 INFO  zookeeper.ClientCnxn (ClientCnxn.java:primeConnection(852)) - Socket connection established to 10.66.137.97/10.66.137.97:2181, initiating session
2017-08-01 04:57:41,445 INFO  zookeeper.ClientCnxn (ClientCnxn.java:onConnected(1235)) - Session establishment complete on server 10.66.137.97/10.66.137.97:2181, sessionid = 0x350dafbffc66af1, negotiated timeout = 30000
2017-08-01 04:57:41,447 INFO  zkclient.ZkClient (ZkClient.java:processStateChanged(711)) - zookeeper state changed (SyncConnected)
2017-08-01 04:57:41,450 ERROR stram.StreamingAppMaster (StreamingAppMaster.java:main(106)) - Exiting Application Master
java.lang.NoSuchMethodError: kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;
        at com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers(KafkaMetadataUtil.java:117)
        at com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers(KafkaConsumer.java:139)
        at com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:506)
        at com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
        at com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
        at com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
        at com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
        at com.datatorrent.stram.StreamingContainerManager.getInstance(StreamingContainerManager.java:3023)
        at com.datatorrent.stram.StreamingAppMasterService.serviceInit(StreamingAppMasterService.java:551)
        at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
        at com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:102)

Reply | Threaded
Open this post in threaded view
|

Re: How to consume from two different topics in one apex application

Thomas Weise-2
I don't think you can use both Kafka client 0.8.x and 0.9.x within the same application. The dependencies overlap and will conflict. You can use 0.8 client to talk to 0.9 server but since you want to use SSL that's not possible (security was only added in 0.9).

I have not tried that, but you might be able to to use the Maven shade plugin to shade Apex Kafka operator along with the Kafka client so both versions can coexist within one application.

Thomas


On Tue, Aug 1, 2017 at 5:42 AM, rishi <[hidden email]> wrote:
Hi, Thanks for the reply.

I tried to consume from two different topics in same app , I am getting
error (*java.lang.NoSuchMethodError:
kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;*)
.

When I tried consuming from kafka 9 using this(KafkaSinglePortInputOperator)
operator, I was able to do it successfully , but when I am adding another
one more operator(KafkaSinglePortByteArrayInputOperator) to consume from .8
in same dag I am getting the error.

For testing I am not merging kafka output to any operator, it is writing at
two different location in HDFS.

Looks like there is some version issue comming , which I am not able to
identify . Any help is highly appreciated.

My pom.xml looks like this=

<properties>

    <apex.version>3.4.0</apex.version>
    <malhar.version>3.6.0</malhar.version>
    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
    <hadoop.version>2.7.1.2.3.4.0-3485</hadoop.version>
        <hbase.version>1.1.2.2.3.4.0-3485</hbase.version>
        <kafka.version>0.9.0.1</kafka.version>
         <confluent.kafka.version>0.9.0.1-cp1</confluent.kafka.version>
        <kafka.avro.srlzr.version>2.0.1</kafka.avro.srlzr.version>

        <avro.version>1.7.7</avro.version>
        <json.version>1.1</json.version>
        <jodatime.version>2.9.1</jodatime.version>
        <kyroserializer.version>0.38</kyroserializer.version>
        <junit.version>4.10</junit.version>
  </properties>

  <repositories>
        <repository>
            <id>HDPReleases</id>
            <name>HDP Releases</name>

<url>http://repo.hortonworks.com/content/repositories/releases/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>HDP Jetty Hadoop</id>
            <name>HDP Jetty Hadoop</name>

<url>http://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven</url>
        </repository>
    </repositories>
        <dependencies>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-library</artifactId>
            <version>${malhar.version}</version>


        </dependency>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>apex-common</artifactId>
            <version>${apex.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>apex-engine</artifactId>
            <version>${apex.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-contrib</artifactId>
            <version>${malhar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-kafka</artifactId>
            <version>${malhar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>

         <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${confluent.kafka.version}</version>
        </dependency>
          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${kafka.avro.srlzr.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>${json.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>${jodatime.version}</version>
        </dependency>

        <dependency>
            <groupId>de.javakaffee</groupId>
            <artifactId>kryo-serializers</artifactId>
            <version>${kyroserializer.version}</version>
        </dependency>
    </dependencies>


My DAG looks like this=>

public void populateDAG(DAG dag, Configuration conf)
  {

    KafkaSinglePortInputOperator kafkaInTtce =
dag.addOperator("Kafka_Input_SSL",new KafkaSinglePortInputOperator());

kafkaInTtce.setInitialPartitionCount(Integer.parseInt(conf.get("kafka.partitioncount")));
    kafkaInTtce.setTopics(conf.get("kafka.ssl.topic"));
    kafkaInTtce.setInitialOffset(conf.get("kafka.offset"));
    kafkaInTtce.setClusters(conf.get("kafka.cluster"));

kafkaInTtce.setConsumerProps(getKafkaProperties(conf.get("kafka.cluster"),
conf));
    kafkaInTtce.setStrategy(conf.get("kafka.strategy"));

    AvroBytesConversionOperator avroConversion =
dag.addOperator("Avro_Convert", new
AvroBytesConversionOperator(conf.get("kafka.schema.registry")));
    ColumnsExtractOperator fieldExtract = dag.addOperator("Field_Extract",
new ColumnsExtractOperator());

    WriteToHdfs hdfs = dag.addOperator("To_HDFS", new
WriteToHdfs(conf.get("hdfs.filename")));
    hdfs.setMaxLength(268435456); // new file rotates after every 256mb

    dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInTtce.outputPort,
avroConversion.input);
    dag.addStream("jsonstring_stream", avroConversion.output,
fieldExtract.input);
    dag.addStream("valid_recs_into_hdfs_stream", fieldExtract.output,
hdfs.input);

    KafkaSinglePortByteArrayInputOperator kafkaInput =
dag.addOperator("Kafka_Input_NonSSL", new
KafkaSinglePortByteArrayInputOperator());
    CopyofAvroBytesConversionOperator avroConversionEstore =
dag.addOperator("Avro_Convert_estore", new
CopyofAvroBytesConversionOperator("http://--------"));
    CopyOfColumnsExtractOperator fieldExtractEstore =
dag.addOperator("Field_Extract_Estore", new CopyOfColumnsExtractOperator());
    WriteToHdfs2 hdfs2 = dag.addOperator("To_HDFS2", new
WriteToHdfs2("DeviceTypeEstore"));
    hdfs2.setMaxLength(268435456);

    dag.addStream("Kafka_Avro_estore_Stream", kafkaInput.outputPort,
avroConversionEstore.input);
    dag.addStream("jsonstring_stream_estore", avroConversionEstore.output,
fieldExtractEstore.input);
    dag.addStream("valid_recs_into_hdfs_estorestream",
fieldExtractEstore.output, hdfs2.input);
  }


Error I am getting(.dt log)=>

2017-08-01 04:57:38,281 INFO  stram.StreamingAppMaster
(StreamingAppMaster.java:main(99)) - Initializing Application Master.
2017-08-01 04:57:38,388 INFO  stram.StreamingAppMasterService
(StreamingAppMasterService.java:serviceInit(537)) - Application master,
appId=507386, clustertimestamp=1500406884031, attemptId=2
2017-08-01 04:57:38,622 WARN  util.NativeCodeLoader
(NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
2017-08-01 04:57:39,441 WARN  shortcircuit.DomainSocketFactory
(DomainSocketFactory.java:<init>(117)) - The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.
2017-08-01 04:57:40,088 INFO  kafka.AbstractKafkaInputOperator
(AbstractKafkaInputOperator.java:initPartitioner(327)) - Initialize
Partitioner
2017-08-01 04:57:40,089 INFO  kafka.AbstractKafkaInputOperator
(AbstractKafkaInputOperator.java:initPartitioner(340)) - Actual Partitioner
is class org.apache.apex.malhar.kafka.OneToManyPartitioner
2017-08-01 04:57:40,121 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:initMetadataClients(234)) - Consumer
Properties :  #
#Tue Aug 01 04:57:40 CDT 2017
security.protocol=SSL
enable.auto.commit=false
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id=org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP
ssl.keystore.password=
ssl.truststore.location=/home_dir/client.truststore.jks
bootstrap.servers=
ssl.truststore.password=
ssl.keystore.location=/home_dir/server.keystore.jks
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

2017-08-01 04:57:40,290 INFO  utils.AppInfoParser
(AppInfoParser.java:<init>(82)) - Kafka version : 0.9.0.0
2017-08-01 04:57:40,291 INFO  utils.AppInfoParser
(AppInfoParser.java:<init>(83)) - Kafka commitId : fc7243c2af4b2b4a
2017-08-01 04:57:41,306 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(151)) - Partition change
detected:
2017-08-01 04:57:41,307 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 0
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-21};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-22};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-23};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-24};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-18};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-19};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-20};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-14};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-25};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-26}
2017-08-01 04:57:41,318 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 1
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-6};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-8};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-2};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-3};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-13};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-16};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-9};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-10};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-11};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-28}
2017-08-01 04:57:41,322 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 2
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-5};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-7};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-17};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-1};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-4};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-29};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-15};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-0};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-27};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-12}
2017-08-01 04:57:41,365 INFO  zkclient.ZkEventThread
(ZkEventThread.java:run(64)) - Starting ZkClient event thread.
2017-08-01 04:57:41,372 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-08-01 04:57:41,372 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:host.name=brdn1351.target.com
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client environment:java.version=1.8.0_73
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle
Corporation
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:java.home=/usr/java/jdk1.8.0_73/jre
connectString=10.66.137.94:2181,10.66.137.95:2181,10.66.137.96:2181,10.66.137.97:2181,10.66.137.98:2181
sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@6d64b553
2017-08-01 04:57:41,388 INFO  zkclient.ZkClient
(ZkClient.java:waitForKeeperState(934)) - Waiting for keeper state
SyncConnected
2017-08-01 04:57:41,392 INFO  zookeeper.ClientCnxn
(ClientCnxn.java:logStartConnect(975)) - Opening socket connection to server
10.66.137.97/10.66.137.97:2181. Will not attempt to authenticate using SASL
(unknown error)
2017-08-01 04:57:41,393 INFO  zookeeper.ClientCnxn
(ClientCnxn.java:primeConnection(852)) - Socket connection established to
10.66.137.97/10.66.137.97:2181, initiating session
2017-08-01 04:57:41,445 INFO  zookeeper.ClientCnxn
(ClientCnxn.java:onConnected(1235)) - Session establishment complete on
server 10.66.137.97/10.66.137.97:2181, sessionid = 0x350dafbffc66af1,
negotiated timeout = 30000
2017-08-01 04:57:41,447 INFO  zkclient.ZkClient
(ZkClient.java:processStateChanged(711)) - zookeeper state changed
(SyncConnected)
2017-08-01 04:57:41,450 ERROR stram.StreamingAppMaster
(StreamingAppMaster.java:main(106)) - Exiting Application Master
java.lang.NoSuchMethodError:
kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;
        at
com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers(KafkaMetadataUtil.java:117)
        at
com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers(KafkaConsumer.java:139)
        at
com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:506)
        at
com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
        at
com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
        at
com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
        at
com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
        at
com.datatorrent.stram.StreamingContainerManager.getInstance(StreamingContainerManager.java:3023)
        at
com.datatorrent.stram.StreamingAppMasterService.serviceInit(StreamingAppMasterService.java:551)
        at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
        at
com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:102)





--
View this message in context: http://apache-apex-users-list.78494.x6.nabble.com/How-to-consume-from-two-different-topics-in-one-apex-application-tp1797p1801.html
Sent from the Apache Apex Users list mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: How to consume from two different topics in one apex application

Sunil Parmar-2
One other way to set up Kafka mirror maker to mirror data from 0.8 cluster into 0.9. It's possible that you may hit some conflict in that as well but it's easy to set up so quick to test. Once you do so, you'll have all data in same cluster. You can have shorter retention in 0.9 cluster to manage disk space. 

Sunil Parmar

On Thu, Aug 3, 2017 at 8:39 AM, Thomas Weise <[hidden email]> wrote:
I don't think you can use both Kafka client 0.8.x and 0.9.x within the same application. The dependencies overlap and will conflict. You can use 0.8 client to talk to 0.9 server but since you want to use SSL that's not possible (security was only added in 0.9).

I have not tried that, but you might be able to to use the Maven shade plugin to shade Apex Kafka operator along with the Kafka client so both versions can coexist within one application.

Thomas


On Tue, Aug 1, 2017 at 5:42 AM, rishi <[hidden email]> wrote:
Hi, Thanks for the reply.

I tried to consume from two different topics in same app , I am getting
error (*java.lang.NoSuchMethodError:
kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;*)
.

When I tried consuming from kafka 9 using this(KafkaSinglePortInputOperator)
operator, I was able to do it successfully , but when I am adding another
one more operator(KafkaSinglePortByteArrayInputOperator) to consume from .8
in same dag I am getting the error.

For testing I am not merging kafka output to any operator, it is writing at
two different location in HDFS.

Looks like there is some version issue comming , which I am not able to
identify . Any help is highly appreciated.

My pom.xml looks like this=

<properties>

    <apex.version>3.4.0</apex.version>
    <malhar.version>3.6.0</malhar.version>
    <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath>
    <hadoop.version>2.7.1.2.3.4.0-3485</hadoop.version>
        <hbase.version>1.1.2.2.3.4.0-3485</hbase.version>
        <kafka.version>0.9.0.1</kafka.version>
         <confluent.kafka.version>0.9.0.1-cp1</confluent.kafka.version>
        <kafka.avro.srlzr.version>2.0.1</kafka.avro.srlzr.version>

        <avro.version>1.7.7</avro.version>
        <json.version>1.1</json.version>
        <jodatime.version>2.9.1</jodatime.version>
        <kyroserializer.version>0.38</kyroserializer.version>
        <junit.version>4.10</junit.version>
  </properties>

  <repositories>
        <repository>
            <id>HDPReleases</id>
            <name>HDP Releases</name>

<url>http://repo.hortonworks.com/content/repositories/releases/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>HDP Jetty Hadoop</id>
            <name>HDP Jetty Hadoop</name>

<url>http://repo.hortonworks.com/content/repositories/jetty-hadoop/</url>
            <layout>default</layout>
        </repository>
        <repository>
            <id>confluent</id>
            <url>http://packages.confluent.io/maven</url>
        </repository>
    </repositories>
        <dependencies>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-library</artifactId>
            <version>${malhar.version}</version>


        </dependency>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>apex-common</artifactId>
            <version>${apex.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>apex-engine</artifactId>
            <version>${apex.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-contrib</artifactId>
            <version>${malhar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.apex</groupId>
            <artifactId>malhar-kafka</artifactId>
            <version>${malhar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>

         <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${confluent.kafka.version}</version>
        </dependency>
          <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${kafka.avro.srlzr.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>${json.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>${jodatime.version}</version>
        </dependency>

        <dependency>
            <groupId>de.javakaffee</groupId>
            <artifactId>kryo-serializers</artifactId>
            <version>${kyroserializer.version}</version>
        </dependency>
    </dependencies>


My DAG looks like this=>

public void populateDAG(DAG dag, Configuration conf)
  {

    KafkaSinglePortInputOperator kafkaInTtce =
dag.addOperator("Kafka_Input_SSL",new KafkaSinglePortInputOperator());

kafkaInTtce.setInitialPartitionCount(Integer.parseInt(conf.get("kafka.partitioncount")));
    kafkaInTtce.setTopics(conf.get("kafka.ssl.topic"));
    kafkaInTtce.setInitialOffset(conf.get("kafka.offset"));
    kafkaInTtce.setClusters(conf.get("kafka.cluster"));

kafkaInTtce.setConsumerProps(getKafkaProperties(conf.get("kafka.cluster"),
conf));
    kafkaInTtce.setStrategy(conf.get("kafka.strategy"));

    AvroBytesConversionOperator avroConversion =
dag.addOperator("Avro_Convert", new
AvroBytesConversionOperator(conf.get("kafka.schema.registry")));
    ColumnsExtractOperator fieldExtract = dag.addOperator("Field_Extract",
new ColumnsExtractOperator());

    WriteToHdfs hdfs = dag.addOperator("To_HDFS", new
WriteToHdfs(conf.get("hdfs.filename")));
    hdfs.setMaxLength(268435456); // new file rotates after every 256mb

    dag.addStream("Kafka_Avro_Msg_Byte_Stream", kafkaInTtce.outputPort,
avroConversion.input);
    dag.addStream("jsonstring_stream", avroConversion.output,
fieldExtract.input);
    dag.addStream("valid_recs_into_hdfs_stream", fieldExtract.output,
hdfs.input);

    KafkaSinglePortByteArrayInputOperator kafkaInput =
dag.addOperator("Kafka_Input_NonSSL", new
KafkaSinglePortByteArrayInputOperator());
    CopyofAvroBytesConversionOperator avroConversionEstore =
dag.addOperator("Avro_Convert_estore", new
CopyofAvroBytesConversionOperator("http://--------"));
    CopyOfColumnsExtractOperator fieldExtractEstore =
dag.addOperator("Field_Extract_Estore", new CopyOfColumnsExtractOperator());
    WriteToHdfs2 hdfs2 = dag.addOperator("To_HDFS2", new
WriteToHdfs2("DeviceTypeEstore"));
    hdfs2.setMaxLength(268435456);

    dag.addStream("Kafka_Avro_estore_Stream", kafkaInput.outputPort,
avroConversionEstore.input);
    dag.addStream("jsonstring_stream_estore", avroConversionEstore.output,
fieldExtractEstore.input);
    dag.addStream("valid_recs_into_hdfs_estorestream",
fieldExtractEstore.output, hdfs2.input);
  }


Error I am getting(.dt log)=>

2017-08-01 04:57:38,281 INFO  stram.StreamingAppMaster
(StreamingAppMaster.java:main(99)) - Initializing Application Master.
2017-08-01 04:57:38,388 INFO  stram.StreamingAppMasterService
(StreamingAppMasterService.java:serviceInit(537)) - Application master,
appId=507386, clustertimestamp=1500406884031, attemptId=2
2017-08-01 04:57:38,622 WARN  util.NativeCodeLoader
(NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
2017-08-01 04:57:39,441 WARN  shortcircuit.DomainSocketFactory
(DomainSocketFactory.java:<init>(117)) - The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.
2017-08-01 04:57:40,088 INFO  kafka.AbstractKafkaInputOperator
(AbstractKafkaInputOperator.java:initPartitioner(327)) - Initialize
Partitioner
2017-08-01 04:57:40,089 INFO  kafka.AbstractKafkaInputOperator
(AbstractKafkaInputOperator.java:initPartitioner(340)) - Actual Partitioner
is class org.apache.apex.malhar.kafka.OneToManyPartitioner
2017-08-01 04:57:40,121 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:initMetadataClients(234)) - Consumer
Properties :  #
#Tue Aug 01 04:57:40 CDT 2017
security.protocol=SSL
enable.auto.commit=false
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
group.id=org.apache.apex.malhar.kafka.AbstractKafkaInputOperatorMETA_GROUP
ssl.keystore.password=
ssl.truststore.location=/home_dir/client.truststore.jks
bootstrap.servers=
ssl.truststore.password=
ssl.keystore.location=/home_dir/server.keystore.jks
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

2017-08-01 04:57:40,290 INFO  utils.AppInfoParser
(AppInfoParser.java:<init>(82)) - Kafka version : 0.9.0.0
2017-08-01 04:57:40,291 INFO  utils.AppInfoParser
(AppInfoParser.java:<init>(83)) - Kafka commitId : fc7243c2af4b2b4a
2017-08-01 04:57:41,306 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(151)) - Partition change
detected:
2017-08-01 04:57:41,307 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 0
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-21};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-22};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-23};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-24};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-18};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-19};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-20};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-14};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-25};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-26}
2017-08-01 04:57:41,318 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 1
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-6};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-8};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-2};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-3};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-13};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-16};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-9};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-10};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-11};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-28}
2017-08-01 04:57:41,322 INFO  kafka.AbstractKafkaPartitioner
(AbstractKafkaPartitioner.java:definePartitions(170)) - [New] Partition 2
with assignment PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-5};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-7};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-17};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-1};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-4};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-29};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-15};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-0};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-27};PartitionMeta{cluster='10.66.137.93:9093',
topicPartition=firefly-apps-superapp-12}
2017-08-01 04:57:41,365 INFO  zkclient.ZkEventThread
(ZkEventThread.java:run(64)) - Starting ZkClient event thread.
2017-08-01 04:57:41,372 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
2017-08-01 04:57:41,372 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:host.name=brdn1351.target.com
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client environment:java.version=1.8.0_73
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client environment:java.vendor=Oracle
Corporation
2017-08-01 04:57:41,373 INFO  zookeeper.ZooKeeper
(Environment.java:logEnv(100)) - Client
environment:java.home=/usr/java/jdk1.8.0_73/jre
connectString=10.66.137.94:2181,10.66.137.95:2181,10.66.137.96:2181,10.66.137.97:2181,10.66.137.98:2181
sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@6d64b553
2017-08-01 04:57:41,388 INFO  zkclient.ZkClient
(ZkClient.java:waitForKeeperState(934)) - Waiting for keeper state
SyncConnected
2017-08-01 04:57:41,392 INFO  zookeeper.ClientCnxn
(ClientCnxn.java:logStartConnect(975)) - Opening socket connection to server
10.66.137.97/10.66.137.97:2181. Will not attempt to authenticate using SASL
(unknown error)
2017-08-01 04:57:41,393 INFO  zookeeper.ClientCnxn
(ClientCnxn.java:primeConnection(852)) - Socket connection established to
10.66.137.97/10.66.137.97:2181, initiating session
2017-08-01 04:57:41,445 INFO  zookeeper.ClientCnxn
(ClientCnxn.java:onConnected(1235)) - Session establishment complete on
server 10.66.137.97/10.66.137.97:2181, sessionid = 0x350dafbffc66af1,
negotiated timeout = 30000
2017-08-01 04:57:41,447 INFO  zkclient.ZkClient
(ZkClient.java:processStateChanged(711)) - zookeeper state changed
(SyncConnected)
2017-08-01 04:57:41,450 ERROR stram.StreamingAppMaster
(StreamingAppMaster.java:main(106)) - Exiting Application Master
java.lang.NoSuchMethodError:
kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;
        at
com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers(KafkaMetadataUtil.java:117)
        at
com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers(KafkaConsumer.java:139)
        at
com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:506)
        at
com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
        at
com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
        at
com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
        at
com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
        at
com.datatorrent.stram.StreamingContainerManager.getInstance(StreamingContainerManager.java:3023)
        at
com.datatorrent.stram.StreamingAppMasterService.serviceInit(StreamingAppMasterService.java:551)
        at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
        at
com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:102)





--
View this message in context: http://apache-apex-users-list.78494.x6.nabble.com/How-to-consume-from-two-different-topics-in-one-apex-application-tp1797p1801.html
Sent from the Apache Apex Users list mailing list archive at Nabble.com.