Uploaded image for project: 'Help-Desk'
  1. Help-Desk
  2. HELP-5946

FIWARE.Request.Tech.Data.BigData-Analysis.cygnus - kafka sink: can't persist data in kafka by remote

    Details

    • Type: extRequest
    • Status: Closed
    • Priority: Major
    • Resolution: Done
    • Fix Version/s: 2021
    • Component/s: FIWARE-TECH-HELP
    • Labels:
      None

      Description

      Hello,

      Is it possible to have Cygnus and kafka running on different machines?

      My kafka instance run over a virtual machine and it is accessible using a public ip.

      Here is my configuration

      1. OrionKafkaSink configuration
      1. sink class, must not be changed

      cygnusagent.sinks.kafka-sink.type = com.telefonica.iot.cygnus.sinks.OrionKafkaSink

      1. channel name from where to read notification events

      cygnusagent.sinks.kafka-sink.channel = kafka-channel

      1. select the Kafka topic type between topic-by-service, topic-by-service-path and topic-by-destination

      cygnusagent.sinks.kafka-sink.topic_type = topic-by-destination

      1. comma-separated list of Kafka brokers (a broker is defined as host:port)

      cygnusagent.sinks.kafka-sink.broker_list = publicIP:9092

      1. Zookeeper endpoint needed to create Kafka topics, in the form of host:port

      cygnusagent.sinks.kafka-sink.zookeeper_endpoint = publicIP:2181

      cygnusagent.sinks.kafka-sink.batch_size = 1

      cygnusagent.sinks.kafka-sink.batch_timeout = 10

      Here is the error that I got in Cygnus: Error connecting to node 0 at quickstart.cloudera:9092

      The quickstart.cloudera is my virtual machine name, can’t figure out why Cygnus is trying to connect to it by name and not by IP address?

      time=2016-02-22T16:38:08.813UTC | lvl=DEBUG | trans= | srv= | subsrv= | function=initiateConnect | comp=Cygnus | msg=org.apache.kafka.clients.NetworkClient[413] : Initiating connection to node 0 at quickstart.cloudera:9092.

      time=2016-02-22T16:38:08.814UTC | lvl=DEBUG | trans= | srv= | subsrv= | function=initiateConnect | comp=Cygnus | msg=org.apache.kafka.clients.NetworkClient[421] : Error connecting to node 0 at quickstart.cloudera:9092:

      java.io.IOException: Can't resolve address: quickstart.cloudera:9092

      at org.apache.kafka.common.network.Selector.connect(Selector.java:138)

      at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:415)

      at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:116)

      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:165)

      at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)

      at java.lang.Thread.run(Thread.java:745)

      Caused by: java.nio.channels.UnresolvedAddressException

      at sun.nio.ch.Net.checkAddress(Net.java:101)

      at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)

      at org.apache.kafka.common.network.Selector.connect(Selector.java:135)

      ... 5 more

      Thanks and Best regards,

      Sabrine FATNASSI

      Test and Validation Engineer

      Easy Global Market : http://www.eglobalmark.com/ <http://www.eglobalmark.com/>

      Email : sabrine.fatnassi@eglobalmark.com <sabrine.fatnassi@eglobalmark.com>

      [Created via e-mail received from: "Sabrine Fatnassi (EGM)" <sabrine.fatnassi@eglobalmark.com>]

        Activity

        Hide
        frb Francisco Romero added a comment -

        Hi Sabrine,

        Of course, it is possible with Kafka and any other storage by simply
        configuring the proper endpoint in the sink.

        I¹ve seen you have this line in the configuration:

        cygnusagent.sinks.kafka-sink.broker_list = publicIP:9092

        cygnusagent.sinks.kafka-sink.zookeeper_endpoint = publicIP:2181

        Such publicIP must be a real IP or FQDN (it is the same). I don¹t know if
        you are effectively configuring ³publicIP² (which is wrong), or it is a
        generic configuration in order to hide the IP address for me Tell me
        which is the case, please. Once configured, the only requirement is such
        endpoint can be reachable through TCP/9092 and TCP/2181 ports.

        Regards,
        Francisco

        Show
        frb Francisco Romero added a comment - Hi Sabrine, Of course, it is possible with Kafka and any other storage by simply configuring the proper endpoint in the sink. I¹ve seen you have this line in the configuration: cygnusagent.sinks.kafka-sink.broker_list = publicIP:9092 cygnusagent.sinks.kafka-sink.zookeeper_endpoint = publicIP:2181 Such publicIP must be a real IP or FQDN (it is the same). I don¹t know if you are effectively configuring ³publicIP² (which is wrong), or it is a generic configuration in order to hide the IP address for me Tell me which is the case, please. Once configured, the only requirement is such endpoint can be reachable through TCP/9092 and TCP/2181 ports. Regards, Francisco
        Hide
        frb Francisco Romero added a comment -

        Hi again,

        OK, so you are actually using an IP address which Cygnus is, by means of some kind of weird reverse name resolution, translating to quickstart.cloudera... Well, I’ll investigate it, in the meantime I’m thinking on a workaround: simply add a new line in your /etc/hosts file about quickstart.cloudera and its IP address. This way, the "Can't resolve address: quickstart.cloudera:9092” log should disappear.

        Regards,
        Francisco

        El 23/2/16 9:31, "Sabrine Fatnassi (EGM)" <sabrine.fatnassi@eglobalmark.com> escribió:

        Hello,

        Thanks for your response, the publicIP is actually a generic configuration and it is reachable through TCP/9O92, and TCP/2181.
        And to be sure if it is reachable or not, can I use "telnet host port"? or is there other method to test that?

        From the other hand, The problem that I saw when I activated the level DEBUG in the logs is: "Can't resolve address: quickstart.cloudera:9092"
        The "quickstart.cloudera" is the name of the virtual machine containing the kafka instance so this is weird for me. And for information when I test the kafka instance locally it works.

        I also have some doubts regarding the configuration:

        cygnusagent.sinks.kafka-sink.batch_size = 1 //Number of events accumulated before persistence
        -> does it mean that Cygnus did not send events to kafka only after it accumulate this number of batch_size?

        cygnusagent.sinks.kafka-sink.batch_timeout = 10 //Number of seconds the batch will be building before it is persisted as it is
        -> what does this mean?

        Thanks and Best regards,
        Sabrine FATNASSI
        Test and Validation Engineer

        Easy Global Market : http://www.eglobalmark.com/
        Email : sabrine.fatnassi@eglobalmark.com

        Show
        frb Francisco Romero added a comment - Hi again, OK, so you are actually using an IP address which Cygnus is, by means of some kind of weird reverse name resolution, translating to quickstart.cloudera... Well, I’ll investigate it, in the meantime I’m thinking on a workaround: simply add a new line in your /etc/hosts file about quickstart.cloudera and its IP address. This way, the "Can't resolve address: quickstart.cloudera:9092” log should disappear. Regards, Francisco El 23/2/16 9:31, "Sabrine Fatnassi (EGM)" <sabrine.fatnassi@eglobalmark.com> escribió: Hello, Thanks for your response, the publicIP is actually a generic configuration and it is reachable through TCP/9O92, and TCP/2181. And to be sure if it is reachable or not, can I use "telnet host port"? or is there other method to test that? From the other hand, The problem that I saw when I activated the level DEBUG in the logs is: "Can't resolve address: quickstart.cloudera:9092" The "quickstart.cloudera" is the name of the virtual machine containing the kafka instance so this is weird for me. And for information when I test the kafka instance locally it works. I also have some doubts regarding the configuration: cygnusagent.sinks.kafka-sink.batch_size = 1 //Number of events accumulated before persistence -> does it mean that Cygnus did not send events to kafka only after it accumulate this number of batch_size? cygnusagent.sinks.kafka-sink.batch_timeout = 10 //Number of seconds the batch will be building before it is persisted as it is -> what does this mean? Thanks and Best regards, Sabrine FATNASSI Test and Validation Engineer Easy Global Market : http://www.eglobalmark.com/ Email : sabrine.fatnassi@eglobalmark.com
        Hide
        frb Francisco Romero added a comment -

        El 23/2/16 11:05, "Sabrine Fatnassi (EGM)" <sabrine.fatnassi@eglobalmark.com> escribió:

        Hi,

        Thanks for the workaround, it is working, tell me if you found the cause

        Best regards,
        Sabrine FATNASSI
        Test and Validation Engineer

        Easy Global Market : http://www.eglobalmark.com/
        Email : sabrine.fatnassi@eglobalmark.com

        Show
        frb Francisco Romero added a comment - El 23/2/16 11:05, "Sabrine Fatnassi (EGM)" <sabrine.fatnassi@eglobalmark.com> escribió: Hi, Thanks for the workaround, it is working, tell me if you found the cause Best regards, Sabrine FATNASSI Test and Validation Engineer Easy Global Market : http://www.eglobalmark.com/ Email : sabrine.fatnassi@eglobalmark.com

          People

          • Assignee:
            frb Francisco Romero
            Reporter:
            fw.ext.user FW External User
          • Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

            • Created:
              Updated:
              Resolved: