- Index: LibreNMS/service.py
- IDEA additional info:
- Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
- <+>UTF-8
- ===================================================================
- --- LibreNMS/service.py (revision 61e877b14f3a071a7b28e32d3baf426237b38214)
- +++ LibreNMS/service.py (date 1524413684000)
- @@ -188,7 +188,7 @@
- self.db_host = config.get('db_host', ServiceConfig.db_host)
- self.db_name = config.get('db_name', ServiceConfig.db_name)
- self.db_pass = config.get('db_pass', ServiceConfig.db_pass)
- - self.db_port = config.get('db_port', ServiceConfig.db_port)
- + self.db_port = int(config.get('db_port', ServiceConfig.db_port))
- self.db_socket = config.get('db_socket', ServiceConfig.db_socket)
- self.db_user = config.get('db_user', ServiceConfig.db_user)
- @@ -323,10 +323,7 @@
- for device in devices:
- self.discovery_manager.post_work(device[0], device[1])
- - def discover_device(self, device_id, group):
- - if not self.verify_queue(device_id, group):
- - return False
- -
- + def discover_device(self, device_id):
- if self.lock_discovery(device_id):
- try:
- with TimeitContext.start() as t:
- @@ -361,10 +358,7 @@
- for device in devices:
- self.services_manager.post_work(device[0], device[1])
- - def poll_services(self, device_id, group):
- - if not self.verify_queue(device_id, group):
- - return False
- -
- + def poll_services(self, device_id):
- with TimeitContext.start() as t:
- info("Checking services on device {}".format(device_id))
- self.call_script('check-services.php', ('-h', device_id))
- @@ -377,7 +371,7 @@
- def dispatch_poll_billing(self):
- self.billing_manager.post_work('poll')
- - def poll_billing(self, run_type, group=None):
- + def poll_billing(self, run_type):
- if run_type == 'poll':
- info("Polling billing")
- self.call_script('poll-billing.php')
- @@ -399,10 +393,8 @@
- debug("Dispatching polling for device {}, time since last poll {:.2f}s"
- .format(device_id, elapsed))
- - def poll_device(self, device_id, group):
- - if not self.verify_queue(device_id, group):
- - return False
- -
- + def poll_device(self, device_id):
- + error("Connections: {}".format(len(self._db._db)))
- if self.lock_polling(device_id):
- info('Polling device {}'.format(device_id))
- @@ -652,12 +644,3 @@
- 'last_polled=values(last_polled), devices=values(devices), time_taken=values(time_taken);'
- .format(self.config.name, jobs, time))
- - def verify_queue(self, device_id, group):
- - devices = self.fetch_device(device_id)
- -
- - for device in devices:
- - if device[1] != group:
- - error("Device {} has been entered into queue {} instead of queue {}".format(device[0], group, device[1]))
- - return False
- -
- - return True
- Index: LibreNMS/queuemanager.py
- IDEA additional info:
- Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
- <+>UTF-8
- ===================================================================
- --- LibreNMS/queuemanager.py (revision 61e877b14f3a071a7b28e32d3baf426237b38214)
- +++ LibreNMS/queuemanager.py (date 1524413910000)
- @@ -40,6 +40,7 @@
- self._work_function = work_function
- self._stop_event = threading.Event()
- + info("Groups: {}".format(self.config.group))
- info("{} QueueManager created: {} workers, {}s frequency"
- .format(self.type.title(), self.get_poller_config().workers, self.get_poller_config().frequency))
- @@ -49,13 +50,12 @@
- def _service_worker(self, work_func, queue_id):
- while not self._stop_event.is_set():
- try:
- - for queue in random.sample(queue_id, len(queue_id)):
- - # cannot break blocking request with redis-py, so timeout :(
- - device_id = self.get_queue(queue).get(True, 3)
- + # cannot break blocking request with redis-py, so timeout :(
- + device_id = self.get_queue(queue_id).get(True, 3)
- - if device_id: # None returned by redis after timeout when empty
- - info("Worker attached to queues: {} removed job from queue {}".format(queue_id, queue))
- - work_func(device_id, queue)
- + if device_id: # None returned by redis after timeout when empty
- + error("Queues: {}".format(self._queues))
- + work_func(device_id)
- except Empty:
- pass # ignore empty queue exception from subprocess.Queue
- except CalledProcessError as e:
- @@ -79,15 +79,18 @@
- """
- Start worker threads
- """
- - workers = max(self.get_poller_config().workers, 1)
- - for i in range(workers):
- - thread_name = "{}-{}".format(self.type.title(), i + 1)
- - pt = threading.Thread(target=self._service_worker, name=thread_name,
- - args=(self._work_function, self.config.group))
- - pt.daemon = True
- - self._threads.append(pt)
- - pt.start()
- - debug("Started {} {} threads".format(workers, self.type))
- + workers = self.get_poller_config().workers
- + groups = self.config.group if hasattr(self.config.group, "__iter__") else [self.config.group]
- + for group in groups:
- + group_workers = max(int(workers / len(groups)), 1)
- + for i in range(group_workers):
- + thread_name = "{}_{}-{}".format(self.type.title(), group, i + 1)
- + pt = threading.Thread(target=self._service_worker, name=thread_name,
- + args=(self._work_function, group))
- + pt.daemon = True
- + self._threads.append(pt)
- + pt.start()
- + debug("Started {} {} threads for group {}".format(group_workers, self.type, group))
- def restart(self):
- """