From TonyM, 9 Months ago, written in Plain Text.
Embed
  1. Index: LibreNMS/service.py
  2. IDEA additional info:
  3. Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
  4. <+>UTF-8
  5. ===================================================================
  6. --- LibreNMS/service.py (revision 61e877b14f3a071a7b28e32d3baf426237b38214)
  7. +++ LibreNMS/service.py (date 1524413684000)
  8. @@ -188,7 +188,7 @@
  9.          self.db_host = config.get('db_host', ServiceConfig.db_host)
  10.          self.db_name = config.get('db_name', ServiceConfig.db_name)
  11.          self.db_pass = config.get('db_pass', ServiceConfig.db_pass)
  12. -        self.db_port = config.get('db_port', ServiceConfig.db_port)
  13. +        self.db_port = int(config.get('db_port', ServiceConfig.db_port))
  14.          self.db_socket = config.get('db_socket', ServiceConfig.db_socket)
  15.          self.db_user = config.get('db_user', ServiceConfig.db_user)
  16.  
  17. @@ -323,10 +323,7 @@
  18.          for device in devices:
  19.              self.discovery_manager.post_work(device[0], device[1])
  20.  
  21. -    def discover_device(self, device_id, group):
  22. -        if not self.verify_queue(device_id, group):
  23. -            return False
  24. -
  25. +    def discover_device(self, device_id):
  26.          if self.lock_discovery(device_id):
  27.              try:
  28.                  with TimeitContext.start() as t:
  29. @@ -361,10 +358,7 @@
  30.          for device in devices:
  31.              self.services_manager.post_work(device[0], device[1])
  32.  
  33. -    def poll_services(self, device_id, group):
  34. -        if not self.verify_queue(device_id, group):
  35. -            return False
  36. -
  37. +    def poll_services(self, device_id):
  38.          with TimeitContext.start() as t:
  39.              info("Checking services on device {}".format(device_id))
  40.              self.call_script('check-services.php', ('-h', device_id))
  41. @@ -377,7 +371,7 @@
  42.      def dispatch_poll_billing(self):
  43.          self.billing_manager.post_work('poll')
  44.  
  45. -    def poll_billing(self, run_type, group=None):
  46. +    def poll_billing(self, run_type):
  47.          if run_type == 'poll':
  48.              info("Polling billing")
  49.              self.call_script('poll-billing.php')
  50. @@ -399,10 +393,8 @@
  51.                      debug("Dispatching polling for device {}, time since last poll {:.2f}s"
  52.                            .format(device_id, elapsed))
  53.  
  54. -    def poll_device(self, device_id, group):
  55. -        if not self.verify_queue(device_id, group):
  56. -            return False
  57. -
  58. +    def poll_device(self, device_id):
  59. +        error("Connections: {}".format(len(self._db._db)))
  60.          if self.lock_polling(device_id):
  61.              info('Polling device {}'.format(device_id))
  62.  
  63. @@ -652,12 +644,3 @@
  64.                         'last_polled=values(last_polled), devices=values(devices), time_taken=values(time_taken);'
  65.                         .format(self.config.name, jobs, time))
  66.  
  67. -    def verify_queue(self, device_id, group):
  68. -        devices = self.fetch_device(device_id)
  69. -
  70. -        for device in devices:
  71. -            if device[1] != group:
  72. -                error("Device {} has been entered into queue {} instead of queue {}".format(device[0], group, device[1]))
  73. -                return False
  74. -
  75. -        return True
  76. Index: LibreNMS/queuemanager.py
  77. IDEA additional info:
  78. Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
  79. <+>UTF-8
  80. ===================================================================
  81. --- LibreNMS/queuemanager.py (revision 61e877b14f3a071a7b28e32d3baf426237b38214)
  82. +++ LibreNMS/queuemanager.py (date 1524413910000)
  83. @@ -40,6 +40,7 @@
  84.          self._work_function = work_function
  85.          self._stop_event = threading.Event()
  86.  
  87. +        info("Groups: {}".format(self.config.group))
  88.          info("{} QueueManager created: {} workers, {}s frequency"
  89.               .format(self.type.title(), self.get_poller_config().workers, self.get_poller_config().frequency))
  90.  
  91. @@ -49,13 +50,12 @@
  92.      def _service_worker(self, work_func, queue_id):
  93.          while not self._stop_event.is_set():
  94.              try:
  95. -                for queue in random.sample(queue_id, len(queue_id)):
  96. -                    # cannot break blocking request with redis-py, so timeout :(
  97. -                    device_id = self.get_queue(queue).get(True, 3)
  98. +                # cannot break blocking request with redis-py, so timeout :(
  99. +                device_id = self.get_queue(queue_id).get(True, 3)
  100.  
  101. -                    if device_id:  # None returned by redis after timeout when empty
  102. -                        info("Worker attached to queues: {} removed job from queue {}".format(queue_id, queue))
  103. -                        work_func(device_id, queue)
  104. +                if device_id:  # None returned by redis after timeout when empty
  105. +                    error("Queues: {}".format(self._queues))
  106. +                    work_func(device_id)
  107.              except Empty:
  108.                  pass  # ignore empty queue exception from subprocess.Queue
  109.              except CalledProcessError as e:
  110. @@ -79,15 +79,18 @@
  111.          """
  112.          Start worker threads
  113.          """
  114. -        workers = max(self.get_poller_config().workers, 1)
  115. -        for i in range(workers):
  116. -            thread_name = "{}-{}".format(self.type.title(), i + 1)
  117. -            pt = threading.Thread(target=self._service_worker, name=thread_name,
  118. -                                  args=(self._work_function, self.config.group))
  119. -            pt.daemon = True
  120. -            self._threads.append(pt)
  121. -            pt.start()
  122. -        debug("Started {} {} threads".format(workers, self.type))
  123. +        workers = self.get_poller_config().workers
  124. +        groups = self.config.group if hasattr(self.config.group, "__iter__") else [self.config.group]
  125. +        for group in groups:
  126. +            group_workers = max(int(workers / len(groups)), 1)
  127. +            for i in range(group_workers):
  128. +                thread_name = "{}_{}-{}".format(self.type.title(), group, i + 1)
  129. +                pt = threading.Thread(target=self._service_worker, name=thread_name,
  130. +                                      args=(self._work_function, group))
  131. +                pt.daemon = True
  132. +                self._threads.append(pt)
  133. +                pt.start()
  134. +            debug("Started {} {} threads for group {}".format(group_workers, self.type, group))
  135.  
  136.      def restart(self):
  137.          """
  138.