/usr/lib/python2.7/dist-packages/shinken/scheduler.py is in shinken-common 1.4-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 | #!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2009-2012:
#     Gabes Jean, naparuba@gmail.com
#     Gerhard Lausser, Gerhard.Lausser@consol.de
#     Gregory Starck, g.starck@gmail.com
#     Hartmut Goebel, h.goebel@goebel-consult.de
#
# This file is part of Shinken.
#
# Shinken is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Shinken is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with Shinken.  If not, see <http://www.gnu.org/licenses/>.
import time
import os
import cStringIO
import sys
import socket
import tempfile
import traceback
from Queue import Empty
try:
    import shinken.pyro_wrapper as pyro
except ImportError:
    sys.exit("Shinken require the Python Pyro module. Please install it.")
from shinken.pyro_wrapper import Pyro
from shinken.external_command import ExternalCommand
from shinken.check import Check
from shinken.notification import Notification
from shinken.eventhandler import EventHandler
from shinken.brok import Brok
from shinken.downtime import Downtime
from shinken.contactdowntime import ContactDowntime
from shinken.comment import Comment
from shinken.acknowledge import Acknowledge
from shinken.log import logger
from shinken.util import nighty_five_percent
from shinken.load import Load
# Pack of common Pyro exceptions
Pyro_exp_pack = (Pyro.errors.ProtocolError, Pyro.errors.URIError, \
                    Pyro.errors.CommunicationError, \
                    Pyro.errors.DaemonError)
class Scheduler:
    """Please Add a Docstring to describe the class here"""
    def __init__(self, scheduler_daemon):
        self.sched_daemon = scheduler_daemon
        # When set to false by us, we die and arbiter launch a new Scheduler
        self.must_run = True
        self.waiting_results = []  # satellites returns us results
        # and to not wait for them, we put them here and
        # use them later
        # Every N seconds we call functions like consume, del zombies
        # etc. All of theses functions are in recurrent_works with the
        # every tick to run. So must be an integer > 0
        # The order is important, so make key an int.
        # TODO: at load, change value by configuration one (like reaper time, etc)
        self.recurrent_works = {
            0: ('update_downtimes_and_comments', self.update_downtimes_and_comments, 1),
            1: ('schedule', self.schedule, 1), # just schedule
            2: ('consume_results', self.consume_results, 1), # incorporate checks and dependencies
            3: ('get_new_actions', self.get_new_actions, 1), # now get the news actions (checks, notif) raised
            4: ('get_new_broks', self.get_new_broks, 1), # and broks
            5: ('delete_zombie_checks', self.delete_zombie_checks, 1),
            6: ('delete_zombie_actions', self.delete_zombie_actions, 1),
            # 3: (self.delete_unwanted_notifications, 1),
            7: ('check_freshness', self.check_freshness, 10),
            8: ('clean_caches', self.clean_caches, 1),
            9: ('update_retention_file', self.update_retention_file, 3600),
            10: ('check_orphaned', self.check_orphaned, 60),
            # For NagVis like tools: update our status every 10s
            11: ('get_and_register_update_program_status_brok', self.get_and_register_update_program_status_brok, 10),
            # Check for system time change. And AFTER get new checks
            # so they are changed too.
            12: ('check_for_system_time_change', self.sched_daemon.check_for_system_time_change, 1),
            # launch if need all internal checks
            13: ('manage_internal_checks', self.manage_internal_checks, 1),
            # clean some times possible overridden Queues, to do not explode in memory usage
            # every 1/4 of hour
            14: ('clean_queues', self.clean_queues, 1),
            # Look for new business_impact change by modulation every minute
            15: ('update_business_values', self.update_business_values, 60),
            # Reset the topology change flag if need
            16: ('reset_topology_change_flag', self.reset_topology_change_flag, 1),
            17: ('check_for_expire_acknowledge', self.check_for_expire_acknowledge, 1),
            18: ('send_broks_to_modules', self.send_broks_to_modules, 1),
            19: ('get_objects_from_from_queues', self.get_objects_from_from_queues, 1),
        }
        # stats part
        self.nb_checks_send = 0
        self.nb_actions_send = 0
        self.nb_broks_send = 0
        self.nb_check_received = 0
        # Log init
        logger.load_obj(self)
        self.instance_id = 0  # Temporary set. Will be erase later
        # Ours queues
        self.checks = {}
        self.actions = {}
        self.downtimes = {}
        self.contact_downtimes = {}
        self.comments = {}
        self.broks = {}
        # Some flags
        self.has_full_broks = False  # have a initial_broks in broks queue?
        self.need_dump_memory = False  # set by signal 1
        self.need_objects_dump = False #set by signal 2
        # And a dummy push flavor
        self.push_flavor = 0
        # Now fake initialize for our satellites
        self.brokers = {}
        self.pollers = {}
        self.reactionners = {}
    def reset(self):
        self.must_run = True
        del self.waiting_results[:]
        for o in self.checks, self.actions, self.downtimes, self.contact_downtimes, self.comments, self.broks, self.brokers:
            o.clear()
        
    # Load conf for future use
    # we are in_test if the data are from an arbiter object like,
    # so only for tests
    def load_conf(self, conf, in_test=False):
        self.program_start = int(time.time())
        self.conf = conf
        self.hostgroups = conf.hostgroups
        self.hostgroups.create_reversed_list()
        self.services = conf.services
        # We need reversed list for search in the retention
        # file read
        self.services.create_reversed_list()
        self.services.optimize_service_search(conf.hosts)
        self.hosts = conf.hosts
        self.hosts.create_reversed_list()
        self.notificationways = conf.notificationways
        self.checkmodulations = conf.checkmodulations
        self.macromodulations = conf.macromodulations
        self.contacts = conf.contacts
        self.contacts.create_reversed_list()
        self.contactgroups = conf.contactgroups
        self.contactgroups.create_reversed_list()
        self.servicegroups = conf.servicegroups
        self.servicegroups.create_reversed_list()
        self.timeperiods = conf.timeperiods
        self.timeperiods.create_reversed_list()
        self.commands = conf.commands
        self.commands.create_reversed_list()
        self.triggers = conf.triggers
        self.triggers.create_reversed_list()
        self.triggers.compile()
        self.triggers.load_objects(self)
        if not in_test:
            # Commands in the host/services/contacts are not real one
            # we must relink them
            t0 = time.time()
            self.conf.late_linkify()
            logger.debug("Late command relink in %d" % (time.time() - t0))
        # self.status_file = StatusFile(self)        # External status file
        self.instance_id = conf.instance_id  # From Arbiter. Use for
                                            # Broker to differentiate
                                            # schedulers
        # Tag our hosts with our instance_id
        for h in self.hosts:
            h.instance_id = conf.instance_id
        for s in self.services:
            s.instance_id = conf.instance_id
        # self for instance_name
        self.instance_name = conf.instance_name
        # and push flavor
        self.push_flavor = conf.push_flavor
        # Now we can update our 'ticks' for special calls
        # like the retention one, etc
        self.update_recurrent_works_tick('update_retention_file', self.conf.retention_update_interval * 60)
        self.update_recurrent_works_tick('clean_queues', self.conf.cleaning_queues_interval)
    # Update the 'tick' for a function call in our
    # recurrent work
    def update_recurrent_works_tick(self, f_name, new_tick):
        for i in self.recurrent_works:
            (name, f, old_tick) = self.recurrent_works[i]
            if name == f_name:
                logger.debug("Changing the tick to %d for the function %s" % (new_tick, name))
                self.recurrent_works[i] = (name, f, new_tick)
    # Load the pollers from our app master
    def load_satellites(self, pollers, reactionners):
        self.pollers = pollers
        self.reactionners = reactionners
    # Oh... Arbiter want us to die... To launch a new Scheduler
    # "Mais qu'a-t-il de plus que je n'ais pas?"
    # "But.. On which point it is better than me?"
    def die(self):
        self.must_run = False
    def dump_objects(self):
        d = tempfile.gettempdir()
        p = os.path.join(d, 'scheduler-obj-dump-%d' % time.time())
        logger.info('Opening the DUMP FILE %s' % (p))
        try:
            f = open(p, 'w')
            f.write('Scheduler DUMP at %d\n' % time.time())
            for c in self.checks.values():
                s = 'CHECK: %s:%s:%s:%s:%s:%s\n' % (c.id, c.status, c.t_to_go, c.poller_tag, c.command, c.worker)
                f.write(s)
            for a in self.actions.values():
                s = '%s: %s:%s:%s:%s:%s:%s\n' % (a.__class__.my_type.upper(), a.id, a.status, a.t_to_go, a.reactionner_tag, a.command, a.worker)
                f.write(s)                
            for b in self.broks.values():
                s = 'BROK: %s:%s\n' % (b.id, b.type)
                f.write(s)
            f.close()
        except Exception, exp:
            logger.error("Error in writing the dump file %s : %s" % (p, str(exp)))
    # Load the external command
    def load_external_command(self, e):
        self.external_command = e
    # We've got activity in the fifo, we get and run commands
    def run_external_commands(self, cmds):
        for command in cmds:
            self.run_external_command(command)
    def run_external_command(self, command):
        logger.debug("scheduler resolves command '%s'" % command)
        ext_cmd = ExternalCommand(command)
        self.external_command.resolve_command(ext_cmd)
    # Add_Brok is a bit more complex than the others, because
    # on starting, the broks are put in a global queue : self.broks
    # then when the first broker connect, it will generate initial_broks
    # in it's own queue (so bname != None).
    # and when in "normal" run, we just need to put the brok to all queues
    def add_Brok(self, brok, bname=None):
        # For brok, we TAG brok with our instance_id
        brok.instance_id = self.instance_id
        # Maybe it's just for one broker
        if bname:
            broks = self.brokers[bname]['broks']
            broks[brok.id] = brok
        else:
            # If there are known brokers, give it to them
            if len(self.brokers) > 0:
                # Or maybe it's for all
                for bname in self.brokers:
                    broks = self.brokers[bname]['broks']
                    broks[brok.id] = brok
            else: # no brokers? maybe at startup for logs
                # we will put in global queue, that the first broker
                # connexion will get all
                self.broks[brok.id] = brok
    def add_Notification(self, notif):
        self.actions[notif.id] = notif
        # A notification ask for a brok
        if notif.contact is not None:
            b = notif.get_initial_status_brok()
            self.add(b)
    def add_Check(self, c):
        self.checks[c.id] = c
        # A new check means the host/service changes its next_check
        # need to be refreshed
        b = c.ref.get_next_schedule_brok()
        self.add(b)
    def add_EventHandler(self, action):
        # print "Add an event Handler", elt.id
        self.actions[action.id] = action
    def add_Downtime(self, dt):
        self.downtimes[dt.id] = dt
        if dt.extra_comment:
            self.add_Comment(dt.extra_comment)
    def add_ContactDowntime(self, contact_dt):
        self.contact_downtimes[contact_dt.id] = contact_dt
    def add_Comment(self, comment):
        self.comments[comment.id] = comment
        b = comment.ref.get_update_status_brok()
        self.add(b)
    # Ok one of our modules send us a command? just run it!
    def add_ExternalCommand(self, ext_cmd):
        self.external_command.resolve_command(ext_cmd)
    # Schedulers have some queues. We can simplify call by adding
    # elements into the proper queue just by looking at their type
    # Brok -> self.broks
    # Check -> self.checks
    # Notification -> self.actions
    # Downtime -> self.downtimes
    # ContactDowntime -> self.contact_downtimes
    def add(self, elt):
        f = self.__add_actions.get(elt.__class__, None)
        if f:
            #print("found action for %s: %s" % (elt.__class__.__name__, f.__name__))
            f(self, elt)
    __add_actions = {
        Check:              add_Check,
        Brok:               add_Brok,
        Notification:       add_Notification,
        EventHandler:       add_EventHandler,
        Downtime:           add_Downtime,
        ContactDowntime:    add_ContactDowntime,
        Comment:            add_Comment,
        ExternalCommand:    add_ExternalCommand,
    }
    # We call the function of modules that got the
    # hook function
    # TODO: find a way to merge this and the version in daemon.py
    def hook_point(self, hook_name):
        for inst in self.sched_daemon.modules_manager.instances:
            full_hook_name = 'hook_' + hook_name
            logger.debug("hook_point: %s: %s %s" % (inst.get_name(), str(hasattr(inst, full_hook_name)), hook_name))
            if hasattr(inst, full_hook_name):
                f = getattr(inst, full_hook_name)
                try:
                    f(self)
                except Exception, exp:
                    logger.error("The instance %s raise an exception %s. I disable it and set it to restart it later" % (inst.get_name(), str(exp)))
                    output = cStringIO.StringIO()
                    traceback.print_exc(file=output)
                    logger.error("Exception trace follows: %s" % (output.getvalue()))
                    output.close()
                    self.sched_daemon.modules_manager.set_to_restart(inst)
    # Ours queues may explode if no one ask us for elements
    # It's very dangerous: you can crash your server... and it's a bad thing :)
    # So we 'just' keep last elements: 5 of max is a good overhead
    def clean_queues(self):
        # if we set the interval at 0, we bail out
        if self.conf.cleaning_queues_interval == 0:
            return
        max_checks = 5 * (len(self.hosts) + len(self.services))
        max_broks = 5 * (len(self.hosts) + len(self.services))
        max_actions = 5 * len(self.contacts) * (len(self.hosts) + len(self.services))
        # For checks, it's not very simple:
        # For checks, they may be referred to their host/service
        # We do not just del them in the check list, but also in their service/host
        # We want id of lower than max_id - 2*max_checks
        if len(self.checks) > max_checks:
            id_max = self.checks.keys()[-1]  # The max id is the last id
                                            #: max is SO slow!
            to_del_checks = [c for c in self.checks.values() if c.id < id_max - max_checks]
            nb_checks_drops = len(to_del_checks)
            if nb_checks_drops > 0:
                logger.info("I have to del some checks (%d)..., sorry" % nb_checks_drops)
            for c in to_del_checks:
                i = c.id
                elt = c.ref
                # First remove the link in host/service
                elt.remove_in_progress_check(c)
                # Then in dependent checks (I depend on, or check
                # depend on me)
                for dependent_checks in c.depend_on_me:
                    dependent_checks.depend_on.remove(c.id)
                for c_temp in c.depend_on:
                    c_temp.depen_on_me.remove(c)
                del self.checks[i]  # Final Bye bye ...
        else:
            nb_checks_drops = 0
        # For broks and actions, it's more simple
        # or brosk, manage global but also all brokers queue
        b_lists = [self.broks]
        for (bname, e) in self.brokers.iteritems():
            b_lists.append(e['broks'])
        for broks in b_lists:
            if len(broks) > max_broks:
                id_max = broks.keys()[-1]
                id_to_del_broks = [i for i in broks if i < id_max - max_broks]
                nb_broks_drops = len(id_to_del_broks)
                for i in id_to_del_broks:
                    del broks[i]
            else:
                nb_broks_drops = 0
        if len(self.actions) > max_actions:
            id_max = self.actions.keys()[-1]
            id_to_del_actions = [i for i in self.actions if i < id_max - max_actions]
            nb_actions_drops = len(id_to_del_actions)
            for i in id_to_del_actions:
                # Remember to delete reference of notification in service/host
                a = self.actions[i]
                if a.is_a == 'notification':
                    a.ref.remove_in_progress_notification(a)
                del self.actions[i]
        else:
            nb_actions_drops = 0
        if nb_checks_drops != 0 or nb_broks_drops != 0 or nb_actions_drops != 0:
            logger.warning("We drop %d checks, %d broks and %d actions" % (nb_checks_drops, nb_broks_drops, nb_actions_drops))
    # For tunning purpose we use caches but we do not want them to explode
    # So we clean them
    def clean_caches(self):
        for tp in self.timeperiods:
            tp.clean_cache()
    # Ask item (host or service) an update_status
    # and add it to our broks queue
    def get_and_register_status_brok(self, item):
        b = item.get_update_status_brok()
        self.add(b)
    # Ask item (host or service) a check_result_brok
    # and add it to our broks queue
    def get_and_register_check_result_brok(self, item):
        b = item.get_check_result_brok()
        self.add(b)
    # We do not want this downtime id
    def del_downtime(self, dt_id):
        if dt_id in self.downtimes:
            self.downtimes[dt_id].ref.del_downtime(dt_id)
            del self.downtimes[dt_id]
    # We do not want this downtime id
    def del_contact_downtime(self, dt_id):
        if dt_id in self.contact_downtimes:
            self.contact_downtimes[dt_id].ref.del_downtime(dt_id)
            del self.contact_downtimes[dt_id]
    # We do not want this comment id
    def del_comment(self, c_id):
        if c_id in self.comments:
            self.comments[c_id].ref.del_comment(c_id)
            del self.comments[c_id]
    # We are looking for outdated acks, and if so, remove them
    def check_for_expire_acknowledge(self):
        for t in [self.hosts, self.services]:
            for i in t:
                i.check_for_expire_acknowledge()
    # We update all business_impact to look at new modulation
    # start for impacts, and so update broks status and
    # problems value too
    def update_business_values(self):
        for t in [self.hosts, self.services]:
            # We first update impacts and classic elements
            for i in [i for i in t if not i.is_problem]:
                was = i.business_impact
                i.update_business_impact_value()
                new = i.business_impact
                # Ok, the business_impact change, we can update the broks
                if new != was:
                    #print "The elements", i.get_name(), "change it's business_impact value"
                    self.get_and_register_status_brok(i)
        # When all impacts and classic elements are updated,
        # we can update problems (their value depend on impacts, so
        # they must be done after)
        for t in [self.hosts, self.services]:
            # We first update impacts and classic elements
            for i in [i for i in t if i.is_problem]:
                was = i.business_impact
                i.update_business_impact_value()
                new = i.business_impact
                # Maybe one of the impacts change it's business_impact to a high value
                # and so ask for the problem to raise too
                if new != was:
                    #print "The elements", i.get_name(), "change it's business_impact value from", was, "to", new
                    self.get_and_register_status_brok(i)
    # Called by poller to get checks
    # Can get checks and actions (notifications and co)
    def get_to_run_checks(self, do_checks=False, do_actions=False,
                          poller_tags=['None'], reactionner_tags=['None'], \
                              worker_name='none', module_types=['fork']
                          ):
        res = []
        now = time.time()
        # If poller want to do checks
        if do_checks:
            for c in self.checks.values():
                #  If the command is untagged, and the poller too, or if both are tagged
                #  with same name, go for it
                # if do_check, call for poller, and so poller_tags by default is ['None']
                # by default poller_tag is 'None' and poller_tags is ['None']
                # and same for module_type, the default is the 'fork' type
                if c.poller_tag in poller_tags and c.module_type in module_types:
                    # must be ok to launch, and not an internal one (business rules based)
                    if c.status == 'scheduled' and c.is_launchable(now) and not c.internal:
                        c.status = 'inpoller'
                        c.worker = worker_name
                        # We do not send c, because it is a link (c.ref) to
                        # host/service and poller do not need it. It only
                        # need a shell with id, command and defaults
                        # parameters. It's the goal of copy_shell
                        res.append(c.copy_shell())
        # If reactionner want to notify too
        if do_actions:
            for a in self.actions.values():
                # if do_action, call the reactionner, and so reactionner_tags by default is ['None']
                # by default reactionner_tag is 'None' and reactionner_tags is ['None'] too
                # So if not the good one, loop for next :)
                if not a.reactionner_tag in reactionner_tags:
                    continue
                # same for module_type
                if not a.module_type in module_types:
                    continue
                # And now look for can launch or not :)
                if a.status == 'scheduled' and a.is_launchable(now):
                    a.status = 'inpoller'
                    a.worker = worker_name
                    if a.is_a == 'notification' and not a.contact:
                        # This is a "master" notification created by create_notifications.
                        # It wont sent itself because it has no contact.
                        # We use it to create "child" notifications (for the contacts and
                        # notification_commands) which are executed in the reactionner.
                        item = a.ref
                        childnotifications = []
                        if not item.notification_is_blocked_by_item(a.type, now):
                            # If it is possible to send notifications of this type at the current time, then create
                            # a single notification for each contact of this item.
                            childnotifications = item.scatter_notification(a)
                            for c in childnotifications:
                                c.status = 'inpoller'
                                self.add(c)  # this will send a brok
                                new_c = c.copy_shell()
                                res.append(new_c)
                        # If we have notification_interval then schedule the next notification (problems only)
                        if a.type == 'PROBLEM':
                            # Update the ref notif number after raise the one of the notification
                            if len(childnotifications) != 0:
                                # notif_nb of the master notification was already current_notification_number+1.
                                # If notifications were sent, then host/service-counter will also be incremented
                                item.current_notification_number = a.notif_nb
                            if item.notification_interval != 0 and a.t_to_go is not None:
                                # We must continue to send notifications.
                                # Just leave it in the actions list and set it to "scheduled" and it will be found again later
                                # Ask the service/host to compute the next notif time. It can be just
                                # a.t_to_go + item.notification_interval * item.__class__.interval_length
                                # or maybe before because we have an escalation that need to raise up before
                                a.t_to_go = item.get_next_notification_time(a)
                                
                                a.notif_nb = item.current_notification_number + 1
                                a.status = 'scheduled'
                            else:
                                # Wipe out this master notification. One problem notification is enough.
                                item.remove_in_progress_notification(a)
                                self.actions[a.id].status = 'zombie'
                        else:
                            # Wipe out this master notification. We don't repeat recover/downtime/flap/etc...
                            item.remove_in_progress_notification(a)
                            self.actions[a.id].status = 'zombie'
                    else:
                        # This is for child notifications and eventhandlers
                        new_a = a.copy_shell()
                        res.append(new_a)
        return res
    # Called by poller and reactionner to send result
    def put_results(self, c):
        if c.is_a == 'notification':
            # We will only see childnotifications here
            try:
                timeout = False
                if c.status == 'timeout':
                    # Unfortunately the remove_in_progress_notification
                    # sets the status to zombie, so we need to save it here.
                    timeout = True
                    execution_time = c.execution_time
                # Add protection for strange charset
                if isinstance(c.output, str):
                    c.output = c.output.decode('utf8', 'ignore')
                self.actions[c.id].get_return_from(c)
                item = self.actions[c.id].ref
                item.remove_in_progress_notification(c)
                self.actions[c.id].status = 'zombie'
                item.last_notification = c.check_time
                # And we ask the item to update it's state
                self.get_and_register_status_brok(item)
                # If we' ve got a problem with the notification, raise a Warning log
                if timeout:
                    logger.warning("Contact %s %s notification command '%s ' timed out after %d seconds" %
                                   (self.actions[c.id].contact.contact_name,
                                    self.actions[c.id].ref.__class__.my_type,
                                    self.actions[c.id].command,
                                    int(execution_time)))
                elif c.exit_status != 0:
                    logger.warning("The notification command '%s' raised an error (exit code=%d): '%s'" % (c.command, c.exit_status, c.output))
            except KeyError, exp:  # bad number for notif, not that bad
                logger.warning('put_results:: get unknown notification : %s ' % str(exp))
            except AttributeError, exp:  # bad object, drop it
                logger.warning('put_results:: get bad notification : %s ' % str(exp))
            
        elif c.is_a == 'check':
            try:
                if c.status == 'timeout':
                    c.output = "(%s Check Timed Out)" % self.checks[c.id].ref.__class__.my_type.capitalize()
                    c.long_output = c.output
                self.checks[c.id].get_return_from(c)
                self.checks[c.id].status = 'waitconsume'
            except KeyError, exp:
                pass
        elif c.is_a == 'eventhandler':
            # It just die
            try:
                if c.status == 'timeout':
                    logger.warning("%s event handler command '%s ' timed out after %d seconds" %
                                   (self.actions[c.id].ref.__class__.my_type.capitalize(),
                                    self.actions[c.id].command,
                                    int(c.execution_time)))
                self.actions[c.id].status = 'zombie'
            # Maybe we got a return of a old even handler, so we can forget it
            except KeyError, exp:
                logger.warning('put_results:: get unknown event handler : %s ' % str(exp))
                pass
        else:
            logger.error("The received result type in unknown! %s" % str(c.is_a))
    # Get the good tabs for links regarding to the kind. If unknown, return None
    def get_links_from_type(self, type):
        t = {'poller': self.pollers, 'reactionner': self.reactionners}
        if type in t:
            return t[type]
        return None
    # Check if we do not connect to often to this
    def is_connection_try_too_close(self, elt):
        now = time.time()
        last_connection = elt['last_connection']
        if now - last_connection < 5:
            return  True
        return False
    # initialize or re-initialize connection with a poller
    # or a reactionner
    def pynag_con_init(self, id, type='poller'):
        # Get good links tab for looping..
        links = self.get_links_from_type(type)
        if links is None:
            logger.debug("Unknown '%s' type for connection!" % type)
            return
        # We want only to initiate connections to the passive
        # pollers and reactionners
        passive = links[id]['passive']
        if not passive:
            return
        # If we try to connect too much, we slow down our tests
        if self.is_connection_try_too_close(links[id]):
            return
        # Ok, we can now update it
        links[id]['last_connection'] = time.time()
        logger.debug("Init connection with %s" % links[id]['uri'])
        uri = links[id]['uri']
        try:
            # But the multiprocessing module is not compatible with it!
            # so we must disable it immediately after
            socket.setdefaulttimeout(3)
            links[id]['con'] = Pyro.core.getProxyForURI(uri)
            con = links[id]['con']
            socket.setdefaulttimeout(None)
        except Pyro_exp_pack, exp:
            # But the multiprocessing module is not compatible with it!
            # so we must disable it immediately after
            socket.setdefaulttimeout(None)
            logger.warning("Connection problem to the %s %s: %s" % (type, links[id]['name'], str(exp)))
            links[id]['con'] = None
            return
        try:
            # initial ping must be quick
            pyro.set_timeout(con, 5)
            con.ping()
        except Pyro.errors.ProtocolError, exp:
            logger.warning("Connection problem to the %s %s: %s" % (type, links[id]['name'], str(exp)))
            links[id]['con'] = None
            return
        except Pyro.errors.NamingError, exp:
            logger.warning("The %s '%s' is not initialized: %s" % (type, links[id]['name'], str(exp)))
            links[id]['con'] = None
            return
        except KeyError, exp:
            logger.warning("The %s '%s' is not initialized: %s" % (type, links[id]['name'], str(exp)))
            links[id]['con'] = None
            return
        except Pyro.errors.CommunicationError, exp:
            logger.warning("The %s '%s' got CommunicationError: %s" % (type, links[id]['name'], str(exp)))
            links[id]['con'] = None
            return
        logger.info("Connection OK to the %s %s" % (type, links[id]['name']))
    # We should push actions to our passives satellites
    def push_actions_to_passives_satellites(self):
        # We loop for our passive pollers or reactionners
        for p in filter(lambda p: p['passive'], self.pollers.values()):
            logger.debug("I will send actions to the poller %s" % str(p))
            con = p['con']
            poller_tags = p['poller_tags']
            if con is not None:
                # get actions
                lst = self.get_to_run_checks(True, False, poller_tags, worker_name=p['name'])
                try:
                    # initial ping must be quick
                    pyro.set_timeout(con, 120)
                    logger.debug("Sending %s actions" % len(lst))
                    con.push_actions(lst, self.instance_id)
                    self.nb_checks_send += len(lst)
                except Pyro.errors.ProtocolError, exp:
                    logger.warning("Connection problem to the %s %s: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except Pyro.errors.NamingError, exp:
                    logger.warning("The %s '%s' is not initialized: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except KeyError, exp:
                    logger.warning("The %s '%s' is not initialized: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except Pyro.errors.CommunicationError, exp:
                    logger.warning("The %s '%s' got CommunicationError: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                # we come back to normal timeout
                pyro.set_timeout(con, 5)
            else:  # no connection? try to reconnect
                self.pynag_con_init(p['instance_id'], type='poller')
        # TODO:factorize
        # We loop for our passive reactionners
        for p in filter(lambda p: p['passive'], self.reactionners.values()):
            logger.debug("I will send actions to the reactionner %s" % str(p))
            con = p['con']
            reactionner_tags = p['reactionner_tags']
            if con is not None:
            # get actions
                lst = self.get_to_run_checks(False, True, reactionner_tags=reactionner_tags, worker_name=p['name'])
                try:
                    # initial ping must be quick
                    pyro.set_timeout(con, 120)
                    logger.debug("Sending %d actions" % len(lst))
                    con.push_actions(lst, self.instance_id)
                    self.nb_checks_send += len(lst)
                except Pyro.errors.ProtocolError, exp:
                    logger.warning("Connection problem to the %s %s: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except Pyro.errors.NamingError, exp:
                    logger.warning("The %s '%s' is not initialized: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except KeyError, exp:
                    logger.warning("The %s '%s' is not initialized: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except Pyro.errors.CommunicationError, exp:
                    logger.warning("The %s '%s' got CommunicationError: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                # we come back to normal timeout
                pyro.set_timeout(con, 5)
            else:  # no connection? try to reconnect
                self.pynag_con_init(p['instance_id'], type='reactionner')
    # We should get returns from satellites
    def get_actions_from_passives_satellites(self):
        # We loop for our passive pollers
        for p in filter(lambda p: p['passive'], self.pollers.values()):
            logger.debug("I will get actions from the poller %s" % str(p))
            con = p['con']
            poller_tags = p['poller_tags']
            if con is not None:
                try:
                    # initial ping must be quick
                    pyro.set_timeout(con, 120)
                    results = con.get_returns(self.instance_id)
                    nb_received = len(results)
                    self.nb_check_received += nb_received
                    logger.debug("Received %d passive results" % nb_received)
                    for result in results:
                        result.set_type_passive()
                    self.waiting_results.extend(results)
                except Pyro.errors.ProtocolError, exp:
                    logger.warning("Connection problem to the %s %s: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except Pyro.errors.NamingError, exp:
                    logger.warning("The %s '%s' is not initialized: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except KeyError, exp:
                    logger.warning("The %s '%s' is not initialized: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except Pyro.errors.CommunicationError, exp:
                    logger.warning("The %s '%s' got CommunicationError: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                # we come back to normal timeout
                pyro.set_timeout(con, 5)
            else:  # no connection, try reinit
                self.pynag_con_init(p['instance_id'], type='poller')
        # We loop for our passive reactionners
        for p in filter(lambda p: p['passive'], self.reactionners.values()):
            logger.debug("I will get actions from the reactionner %s" % str(p))
            con = p['con']
            reactionner_tags = p['reactionner_tags']
            if con is not None:
                try:
                    # initial ping must be quick
                    pyro.set_timeout(con, 120)
                    results = con.get_returns(self.instance_id)
                    nb_received = len(results)
                    self.nb_check_received += nb_received
                    logger.debug("Received %d passive results" % nb_received)
                    for result in results:
                        result.set_type_passive()
                    self.waiting_results.extend(results)
                except Pyro.errors.ProtocolError, exp:
                    logger.warning("Connection problem to the %s %s: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except Pyro.errors.NamingError, exp:
                    logger.warning("The %s '%s' is not initialized: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except KeyError, exp:
                    logger.warning("The %s '%s' is not initialized: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                except Pyro.errors.CommunicationError, exp:
                    logger.warning("The %s '%s' got CommunicationError: %s" % (type, p['name'], str(exp)))
                    p['con'] = None
                    return
                # we come back to normal timeout
                pyro.set_timeout(con, 5)
            else:  # no connection, try reinit
                self.pynag_con_init(p['instance_id'], type='reactionner')
    # Some checks are purely internal, like business based one
    # simply ask their ref to manage it when it's ok to run
    def manage_internal_checks(self):
        now = time.time()
        for c in self.checks.values():
            # must be ok to launch, and not an internal one (business rules based)
            if c.internal and c.status == 'scheduled' and c.is_launchable(now):
                c.ref.manage_internal_check(c)
                # it manage it, now just ask to consume it
                # like for all checks
                c.status = 'waitconsume'
    # Call by brokers to have broks
    # We give them, and clean them!
    def get_broks(self, bname):
        # If we are here, we are sure the broker entry exists
        res = self.brokers[bname]['broks']
        # They are gone, we keep none!
        self.brokers[bname]['broks'] = {}
        # Also put in the result the possible first log broks if so
        res.update(self.broks)
        # and clean the global broks too now
        self.broks.clear()
        
        return res
    # An element can have its topology changed by an external command
    # if so a brok will be generated with this flag. No need to reset all of
    # them.
    def reset_topology_change_flag(self):
        for i in self.hosts:
            i.topology_change = False
        for i in  self.services:
            i.topology_change = False
    # Update the retention file and give all te data in
    # a dict so the read function can pickup what it wants
    # For now compression is not used, but it can be added easily
    # just uncomment :)
    def update_retention_file(self, forced=False):
        # If we set the update to 0, we do not want of this
        # if we do not forced (like at stopping)
        if self.conf.retention_update_interval == 0 and not forced:
            return
        self.hook_point('save_retention')
    # Load the retention file and get status from it. It does not get all checks in progress
    # for the moment, just the status and the notifications.
    def retention_load(self):
        self.hook_point('load_retention')
    # Helper function for module, will give the host and service
    # data
    def get_retention_data(self):
        # We create an all_data dict with list of useful retention data dicts
        # of our hosts and services
        all_data = {'hosts': {}, 'services': {}}
        for h in self.hosts:
            d = {}
            running_properties = h.__class__.running_properties
            for prop, entry in running_properties.items():
                if entry.retention:
                    v = getattr(h, prop)
                    # Maybe we should "prepare" the data before saving it
                    # like get only names instead of the whole objects
                    f = entry.retention_preparation
                    if f:
                        v = f(h, v)
                    d[prop] = v
            # and some properties are also like this, like
            # active checks enabled or not
            properties = h.__class__.properties
            for prop, entry in properties.items():
                if entry.retention:
                    v = getattr(h, prop)
                    # Maybe we should "prepare" the data before saving it
                    # like get only names instead of the whole objects
                    f = entry.retention_preparation
                    if f:
                        v = f(h, v)
                    d[prop] = v
            all_data['hosts'][h.host_name] = d
        # Same for services
        for s in self.services:
            d = {}
            running_properties = s.__class__.running_properties
            for prop, entry in running_properties.items():
                if entry.retention:
                    v = getattr(s, prop)
                    # Maybe we should "prepare" the data before saving it
                    # like get only names instead of the whole objects
                    f = entry.retention_preparation
                    if f:
                        v = f(s, v)
                    d[prop] = v
            # Same for properties, like active checks enabled or not
            properties = s.__class__.properties
            for prop, entry in properties.items():
                if entry.retention:
                    v = getattr(s, prop)
                    # Maybe we should "prepare" the data before saving it
                    # like get only names instead of the whole objects
                    f = entry.retention_preparation
                    if f:
                        v = f(s, v)
                    d[prop] = v
            all_data['services'][(s.host.host_name, s.service_description)] = d
        return all_data
    # Get back our broks from a retention module :)
    def restore_retention_data(self, data):
        # Now load interesting properties in hosts/services
        # Tagging retention=False prop that not be directly load
        # Items will be with theirs status, but not in checking, so
        # a new check will be launched like with a normal beginning (random distributed
        # scheduling)
        ret_hosts = data['hosts']
        for ret_h_name in ret_hosts:
            # We take the dict of our value to load
            d = data['hosts'][ret_h_name]
            h = self.hosts.find_by_name(ret_h_name)
            if h is not None:
                # First manage all running properties
                running_properties = h.__class__.running_properties
                for prop, entry in running_properties.items():
                    if entry.retention:
                        # Maybe the saved one was not with this value, so
                        # we just bypass this
                        if prop in d:
                            setattr(h, prop, d[prop])
                # Ok, some are in properties too (like active check enabled
                # or not. Will OVERRIDE THE CONFIGURATION VALUE!
                properties = h.__class__.properties
                for prop, entry in properties.items():
                    if entry.retention:
                        # Maybe the saved one was not with this value, so
                        # we just bypass this
                        if prop in d:
                            setattr(h, prop, d[prop])
                # Now manage all linked objects load from previous run
                for a in h.notifications_in_progress.values():
                    a.ref = h
                    self.add(a)
                    # Also raises the action id, so do not overlap ids
                    a.assume_at_least_id(a.id)
                h.update_in_checking()
                # And also add downtimes and comments
                for dt in h.downtimes:
                    dt.ref = h
                    if hasattr(dt, 'extra_comment'):
                        dt.extra_comment.ref = h
                    else:
                        dt.extra_comment = None
                    # raises the downtime id to do not overlap
                    Downtime.id = max(Downtime.id, dt.id + 1)
                    self.add(dt)
                for c in h.comments:
                    c.ref = h
                    self.add(c)
                    # raises comment id to do not overlap ids
                    Comment.id = max(Comment.id, c.id + 1)
                if h.acknowledgement is not None:
                    h.acknowledgement.ref = h
                    # Raises the id of future ack so we don't overwrite
                    # these one
                    Acknowledge.id = max(Acknowledge.id, h.acknowledgement.id + 1)
                # Relink the notified_contacts as a set() of true contacts objects
                # it it was load from the retention, it's now a list of contacts
                # names
                if 'notified_contacts' in d:
                    new_notified_contacts = set()
                    for cname in h.notified_contacts:
                        c = self.contacts.find_by_name(cname)
                        # Maybe the contact is gone. Skip it
                        if c:
                            new_notified_contacts.add(c)
                    h.notified_contacts = new_notified_contacts
        # SAme for services
        ret_services = data['services']
        for (ret_s_h_name, ret_s_desc) in ret_services:
            # We take our dict to load
            d = data['services'][(ret_s_h_name, ret_s_desc)]
            s = self.services.find_srv_by_name_and_hostname(ret_s_h_name, ret_s_desc)
            if s is not None:
                # Load the major values from running properties
                running_properties = s.__class__.running_properties
                for prop, entry in running_properties.items():
                    if entry.retention:
                        # Maybe the saved one was not with this value, so
                        # we just bypass this
                        if prop in d:
                            setattr(s, prop, d[prop])
                # And some others from properties dict too
                properties = s.__class__.properties
                for prop, entry in properties.items():
                    if entry.retention:
                        # Maybe the saved one was not with this value, so
                        # we just bypass this
                        if prop in d:
                            setattr(s, prop, d[prop])
                # Ok now manage all linked objects
                for a in s.notifications_in_progress.values():
                    a.ref = s
                    self.add(a)
                    # Also raises the action id, so do not overlap id
                    a.assume_at_least_id(a.id)
                s.update_in_checking()
                # And also add downtimes and comments
                for dt in s.downtimes:
                    dt.ref = s
                    if hasattr(dt, 'extra_comment'):
                        dt.extra_comment.ref = s
                    else:
                        dt.extra_comment = None
                    # raises the downtime id to do not overlap
                    Downtime.id = max(Downtime.id, dt.id + 1)
                    self.add(dt)
                for c in s.comments:
                    c.ref = s
                    self.add(c)
                    # raises comment id to do not overlap ids
                    Comment.id = max(Comment.id, c.id + 1)
                if s.acknowledgement is not None:
                    s.acknowledgement.ref = s
                    # Raises the id of future ack so we don't overwrite
                    # these one
                    Acknowledge.id = max(Acknowledge.id, s.acknowledgement.id + 1)
                # Relink the notified_contacts as a set() of true contacts objects
                # it it was load from the retention, it's now a list of contacts
                # names
                if 'notified_contacts' in d:
                    new_notified_contacts = set()
                    for cname in s.notified_contacts:
                        c = self.contacts.find_by_name(cname)
                        # Maybe the contact is gone. Skip it
                        if c:
                            new_notified_contacts.add(c)
                    s.notified_contacts = new_notified_contacts
    # Fill the self.broks with broks of self (process id, and co)
    # broks of service and hosts (initial status)
    def fill_initial_broks(self, bname, with_logs=False):
        # First a Brok for delete all from my instance_id
        b = Brok('clean_all_my_instance_id', {'instance_id': self.instance_id})
        self.add_Brok(b, bname)
        # first the program status
        b = self.get_program_status_brok()
        self.add_Brok(b, bname)
        #  We can't call initial_status from all this types
        #  The order is important, service need host...
        initial_status_types = (self.timeperiods, self.commands,
                          self.contacts, self.contactgroups,
                          self.hosts, self.hostgroups,
                          self.services, self.servicegroups)
        self.conf.skip_initial_broks = getattr(self.conf,'skip_initial_broks', False)
        logger.debug("Skipping initial broks? %s" % str(self.conf.skip_initial_broks))
        if not self.conf.skip_initial_broks:
            for tab in initial_status_types:
                for i in tab:
                    b = i.get_initial_status_brok()
                    self.add_Brok(b, bname)
        # Only raises the all logs at the scheduler startup
        if with_logs:
            # Ask for INITIAL logs for services and hosts
            for i in self.hosts:
                i.raise_initial_state()
            for i in self.services:
                i.raise_initial_state()
        # Add a brok to say that we finished all initial_pass
        b = Brok('initial_broks_done', {'instance_id': self.instance_id})
        self.add_Brok(b, bname)
        # We now have all full broks
        self.has_full_broks = True
        logger.info("[%s] Created %d initial Broks for broker %s" % (self.instance_name, len(self.brokers[bname]['broks']), bname))
    # Crate a brok with program status info
    def get_and_register_program_status_brok(self):
        b = self.get_program_status_brok()
        self.add(b)
    # Crate a brok with program status info
    def get_and_register_update_program_status_brok(self):
        b = self.get_program_status_brok()
        b.type = 'update_program_status'
        self.add(b)
    # Get a brok with program status
    # TODO: GET REAL VALUES
    def get_program_status_brok(self):
        now = int(time.time())
        data = {"is_running": 1,
                "instance_id": self.instance_id,
                "instance_name": self.instance_name,
                "last_alive": now,
                "program_start": self.program_start,
                "pid": os.getpid(),
                "daemon_mode": 1,
                "last_command_check": now,
                "last_log_rotation": now,
                "notifications_enabled": self.conf.enable_notifications,
                "active_service_checks_enabled": self.conf.execute_service_checks,
                "passive_service_checks_enabled": self.conf.accept_passive_service_checks,
                "active_host_checks_enabled": self.conf.execute_host_checks,
                "passive_host_checks_enabled": self.conf.accept_passive_host_checks,
                "event_handlers_enabled": self.conf.enable_event_handlers,
                "flap_detection_enabled": self.conf.enable_flap_detection,
                "failure_prediction_enabled": 0,
                "process_performance_data": self.conf.process_performance_data,
                "obsess_over_hosts": self.conf.obsess_over_hosts,
                "obsess_over_services": self.conf.obsess_over_services,
                "modified_host_attributes": 0,
                "modified_service_attributes": 0,
                "global_host_event_handler": self.conf.global_host_event_handler,
                'global_service_event_handler': self.conf.global_service_event_handler,
                'check_external_commands': self.conf.check_external_commands,
                'check_service_freshness': self.conf.check_service_freshness,
                'check_host_freshness': self.conf.check_host_freshness,
                'command_file': self.conf.command_file
                }
        b = Brok('program_status', data)
        return b
    # Called every 1sec to consume every result in services or hosts
    # with these results, they are OK, CRITICAL, UP/DOWN, etc...
    def consume_results(self):
        # All results are in self.waiting_results
        # We need to get them first
        for c in self.waiting_results:
            self.put_results(c)
        self.waiting_results = []
        # Then we consume them
        #print "**********Consume*********"
        for c in self.checks.values():
            if c.status == 'waitconsume':
                item = c.ref
                item.consume_result(c)
        # All 'finished' checks (no more dep) raise checks they depends on
        for c in self.checks.values():
            if c.status == 'havetoresolvedep':
                for dependent_checks in c.depend_on_me:
                    # Ok, now dependent will no more wait c
                    dependent_checks.depend_on.remove(c.id)
                # REMOVE OLD DEP CHECK -> zombie
                c.status = 'zombie'
        # Now, reinteger dep checks
        for c in self.checks.values():
            if c.status == 'waitdep' and len(c.depend_on) == 0:
                item = c.ref
                item.consume_result(c)
    # Called every 1sec to delete all checks in a zombie state
    # zombie = not useful anymore
    def delete_zombie_checks(self):
        #print "**********Delete zombies checks****"
        id_to_del = []
        for c in self.checks.values():
            if c.status == 'zombie':
                id_to_del.append(c.id)
        # une petite tape dans le dos et tu t'en vas, merci...
        # *pat pat* GFTO, thks :)
        for id in id_to_del:
            del self.checks[id]  # ZANKUSEN!
    # Called every 1sec to delete all actions in a zombie state
    # zombie = not useful anymore
    def delete_zombie_actions(self):
        #print "**********Delete zombies actions****"
        id_to_del = []
        for a in self.actions.values():
            if a.status == 'zombie':
                id_to_del.append(a.id)
        # une petite tape dans le dos et tu t'en vas, merci...
        # *pat pat* GFTO, thks :)
        for id in id_to_del:
            del self.actions[id]  # ZANKUSEN!
    # Check for downtimes start and stop, and register
    # them if needed
    def update_downtimes_and_comments(self):
        broks = []
        now = time.time()
        # Look for in objects comments, and look if we already got them
        for elt in [y for y in [x for x in self.hosts] + [x for x in self.services]]:
            for c in elt.comments:
                if not c.id in self.comments:
                    self.comments[c.id] = c
        # Check maintenance periods
        for elt in [y for y in [x for x in self.hosts] + [x for x in self.services] if y.maintenance_period is not None]:
            if not hasattr(elt, 'in_maintenance'):
                setattr(elt, 'in_maintenance', False)
            if not elt.in_maintenance:
                if elt.maintenance_period.is_time_valid(now):
                    start_dt = elt.maintenance_period.get_next_valid_time_from_t(now)
                    end_dt = elt.maintenance_period.get_next_invalid_time_from_t(start_dt + 1) - 1
                    dt = Downtime(elt, start_dt, end_dt, 1, 0, 0, "system", "this downtime was automatically scheduled through a maintenance_period")
                    elt.add_downtime(dt)
                    self.add(dt)
                    self.get_and_register_status_brok(elt)
                    elt.in_maintenance = dt.id
            else:
                if not elt.in_maintenance in self.downtimes:
                    # the main downtimes has expired or was manually deleted
                    elt.in_maintenance = False
        #  Check the validity of contact downtimes
        for elt in self.contacts:
            for dt in elt.downtimes:
                dt.check_activation()
        # A loop where those downtimes are removed
        # which were marked for deletion (mostly by dt.exit())
        for dt in self.downtimes.values():
            if dt.can_be_deleted == True:
                ref = dt.ref
                self.del_downtime(dt.id)
                broks.append(ref.get_update_status_brok())
        # Same for contact downtimes:
        for dt in self.contact_downtimes.values():
            if dt.can_be_deleted == True:
                ref = dt.ref
                self.del_contact_downtime(dt.id)
                broks.append(ref.get_update_status_brok())
        # Downtimes are usually accompanied by a comment.
        # An exiting downtime also invalidates it's comment.
        for c in self.comments.values():
            if c.can_be_deleted == True:
                ref = c.ref
                self.del_comment(c.id)
                broks.append(ref.get_update_status_brok())
        # Check start and stop times
        for dt in self.downtimes.values():
            if dt.real_end_time < now:
                # this one has expired
                broks.extend(dt.exit())  # returns downtimestop notifications
            elif now >= dt.start_time and dt.fixed and not dt.is_in_effect:
                # this one has to start now
                broks.extend(dt.enter())  # returns downtimestart notifications
                broks.append(dt.ref.get_update_status_brok())
        for b in broks:
            self.add(b)
    # Main schedule function to make the regular scheduling
    def schedule(self):
        # ask for service and hosts their next check
        for type_tab in [self.services, self.hosts]:
            for i in type_tab:
                i.schedule()
    # Main actions reaper function: it get all new checks,
    # notification and event handler from hosts and services
    def get_new_actions(self):
        self.hook_point('get_new_actions')
        # ask for service and hosts their next check
        for type_tab in [self.services, self.hosts]:
            for i in type_tab:
                for a in i.actions:
                    self.add(a)
                # We take all, we can clear it
                i.actions = []
    # Similar as above, but for broks
    def get_new_broks(self):
        # ask for service and hosts their broks waiting
        # be eaten
        for type_tab in [self.services, self.hosts]:
            for i in type_tab:
                for b in i.broks:
                    self.add(b)
                # We take all, we can clear it
                i.broks = []
    # Raises checks for no fresh states for services and hosts
    def check_freshness(self):
        #print "********** Check freshness******"
        for type_tab in [self.services, self.hosts]:
            for i in type_tab:
                c = i.do_check_freshness()
                if c is not None:
                    self.add(c)
    # Check for orphaned checks: checks that never returns back
    # so if inpoller and t_to_go < now - 300s: pb!
    # Warn only one time for each "worker"
    # XXX I think we should make "time_to_orphanage" configurable
    #     each action type, each for notification, event_handler & check
    #     I think it will be a little more useful that way, not sure tho
    def check_orphaned(self):
        worker_names = {}
        now = int(time.time())
        for c in self.checks.values():
            time_to_orphanage = c.ref.get_time_to_orphanage()
            if time_to_orphanage:
                if c.status == 'inpoller' and c.t_to_go < now - time_to_orphanage:
                    c.status = 'scheduled'
                    if c.worker not in worker_names:
                        worker_names[c.worker] = 1
                        continue
                    worker_names[c.worker] += 1
        for a in self.actions.values():
            time_to_orphanage = a.ref.get_time_to_orphanage()
            if time_to_orphanage:
                if a.status == 'inpoller' and a.t_to_go < now - time_to_orphanage:
                    a.status = 'scheduled'
                    if a.worker not in worker_names:
                        worker_names[a.worker] = 1
                        continue
                    worker_names[a.worker] += 1
        for w in worker_names:
            logger.warning("%d actions never came back for the satellite '%s'. I'm reenable them for polling" % (worker_names[w], w))
    # Each loop we are going to send our broks to our modules (if need)
    def send_broks_to_modules(self):
        t0 = time.time()
        nb_sent = 0
        for mod in self.sched_daemon.modules_manager.get_external_instances():
            logger.debug("Look for sending to module %s" % mod.get_name())
            q = mod.to_q
            to_send = [b for b in self.broks.values() if not getattr(b, 'sent_to_sched_externals', False) and mod.want_brok(b)]
            q.put(to_send)
            nb_sent += len(to_send)
        # No more need to send them
        for b in self.broks.values():
            b.sent_to_sched_externals = True
        logger.debug("Time to send %s broks (after %d secs)" % (nb_sent, time.time() - t0))
    # Get 'objects' from external modules
    # right now on nobody uses it, but it can be useful
    # for a module like livestatus to raise external
    # commands for example
    def get_objects_from_from_queues(self):
        for f in self.sched_daemon.modules_manager.get_external_from_queues():
            full_queue = True
            while full_queue:
                try:
                    o = f.get(block=False)
                    self.add(o)
                except Empty:
                    full_queue = False
    # Main function
    def run(self):
        # Then we see if we've got info in the retention file
        self.retention_load()
        # Finally start the external modules now we got our data
        self.hook_point('pre_scheduler_mod_start')
        self.sched_daemon.modules_manager.start_external_instances(late_start=True)
        # Ok, now all is initialized, we can make the initial broks
        logger.info("[%s] First scheduling launched" % self.instance_name)
        self.schedule()
        logger.info("[%s] First scheduling done" % self.instance_name)
        # Now connect to the passive satellites if needed
        for p_id in self.pollers:
            self.pynag_con_init(p_id, type='poller')
        # Ticks are for recurrent function call like consume
        # del zombies etc
        ticks = 0
        timeout = 1.0  # For the select
        gogogo = time.time()
        self.load_one_min = Load(initial_value=1)
        logger.debug("First loop at %d" % time.time())
        while self.must_run:
            #print "Loop"
            # Before answer to brokers, we send our broks to modules
            # Ok, go to send our broks to our external modules
            #self.send_broks_to_modules()
            elapsed, _, _ = self.sched_daemon.handleRequests(timeout)
            if elapsed:
                timeout -= elapsed
                if timeout > 0:
                    continue
            self.load_one_min.update_load(self.sched_daemon.sleep_time)
            # load of the scheduler is the percert of time it is waiting
            l = min(100, 100.0 - self.load_one_min.get_load() * 100)
            logger.debug("Load: (sleep) %.2f (average: %.2f) -> %d%%" % (self.sched_daemon.sleep_time, self.load_one_min.get_load(), l))
            self.sched_daemon.sleep_time = 0.0
            # Timeout or time over
            timeout = 1.0
            ticks += 1
            # Do recurrent works like schedule, consume
            # delete_zombie_checks
            for i in self.recurrent_works:
                (name, f, nb_ticks) = self.recurrent_works[i]
                # A 0 in the tick will just disable it
                if nb_ticks != 0:
                    if ticks % nb_ticks == 0:
                        # print "I run function:", name
                        f()
            # DBG: push actions to passives?
            self.push_actions_to_passives_satellites()
            self.get_actions_from_passives_satellites()
            #if  ticks % 10 == 0:
            #    self.conf.quick_debug()
            # stats
            nb_scheduled = len([c for c in self.checks.values() if c.status == 'scheduled'])
            nb_inpoller = len([c for c in self.checks.values() if c.status == 'inpoller'])
            nb_zombies = len([c for c in self.checks.values() if c.status == 'zombie'])
            nb_notifications = len(self.actions)
            logger.debug("Checks: total %s, scheduled %s, inpoller %s, zombies %s, notifications %s" %\
                (len(self.checks), nb_scheduled, nb_inpoller, nb_zombies, nb_notifications))
            # Get a overview of the latencies with just
            # a 95 percentile view, but lso min/max values
            latencies = [s.latency for s in self.services]
            lat_avg, lat_min, lat_max = nighty_five_percent(latencies)
            if lat_avg is not None:
                logger.debug("Latency (avg/min/max): %.2f/%.2f/%.2f" % (lat_avg, lat_min, lat_max))
            # print "Notifications:", nb_notifications
            now = time.time()
            if self.nb_checks_send != 0:
                logger.debug("Nb checks/notifications/event send: %s" % self.nb_checks_send)
            self.nb_checks_send = 0
            if self.nb_broks_send != 0:
                logger.debug("Nb Broks send: %s" % self.nb_broks_send)
            self.nb_broks_send = 0
            time_elapsed = now - gogogo
            logger.debug("Check average = %d checks/s" % int(self.nb_check_received / time_elapsed))
            if self.need_dump_memory:
                self.sched_daemon.dump_memory()
                self.need_dump_memory = False
            if self.need_objects_dump:
                logger.debug('I need to dump my objects!')
                self.dump_objects()
                self.need_objects_dump = False
        # WE must save the retention at the quit BY OURSELF
        # because our daemon will not be able to do it for us
        self.update_retention_file(True)
 |