/usr/lib/python2.7/dist-packages/breezy/config.py is in python-breezy 3.0.0~bzr6852-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991 1992 1993 1994 1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041 2042 2043 2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971 2972 2973 2974 2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987 2988 2989 2990 2991 2992 2993 2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034 3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063 3064 3065 3066 3067 3068 3069 3070 3071 3072 3073 3074 3075 3076 3077 3078 3079 3080 3081 3082 3083 3084 3085 3086 3087 3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106 3107 3108 3109 3110 3111 3112 3113 3114 3115 3116 3117 3118 3119 3120 3121 3122 3123 3124 3125 3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138 3139 3140 3141 3142 3143 3144 3145 3146 3147 3148 3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159 3160 3161 3162 3163 3164 3165 3166 3167 3168 3169 3170 3171 3172 3173 3174 3175 3176 3177 3178 3179 3180 3181 3182 3183 3184 3185 3186 3187 3188 3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206 3207 3208 3209 3210 3211 3212 3213 3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242 3243 3244 3245 3246 3247 3248 3249 3250 3251 3252 3253 3254 3255 3256 3257 3258 3259 3260 3261 3262 3263 3264 3265 3266 3267 3268 3269 3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316 3317 3318 3319 3320 3321 3322 3323 3324 3325 3326 3327 3328 3329 3330 3331 3332 3333 3334 3335 3336 3337 3338 3339 3340 3341 3342 3343 3344 3345 3346 3347 3348 3349 3350 3351 3352 3353 3354 3355 3356 3357 3358 3359 3360 3361 3362 3363 3364 3365 3366 3367 3368 3369 3370 3371 3372 3373 3374 3375 3376 3377 3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400 3401 3402 3403 3404 3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418 3419 3420 3421 3422 3423 3424 3425 3426 3427 3428 3429 3430 3431 3432 3433 3434 3435 3436 3437 3438 3439 3440 3441 3442 3443 3444 3445 3446 3447 3448 3449 3450 3451 3452 3453 3454 3455 3456 3457 3458 3459 3460 3461 3462 3463 3464 3465 3466 3467 3468 3469 3470 3471 3472 3473 3474 3475 3476 3477 3478 3479 3480 3481 3482 3483 3484 3485 3486 3487 3488 3489 3490 3491 3492 3493 3494 3495 3496 3497 3498 3499 3500 3501 3502 3503 3504 3505 3506 3507 3508 3509 3510 3511 3512 3513 3514 3515 3516 3517 3518 3519 3520 3521 3522 3523 3524 3525 3526 3527 3528 3529 3530 3531 3532 3533 3534 3535 3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556 3557 3558 3559 3560 3561 3562 3563 3564 3565 3566 3567 3568 3569 3570 3571 3572 3573 3574 3575 3576 3577 3578 3579 3580 3581 3582 3583 3584 3585 3586 3587 3588 3589 3590 3591 3592 3593 3594 3595 3596 3597 3598 3599 3600 3601 3602 3603 3604 3605 3606 3607 3608 3609 3610 3611 3612 3613 3614 3615 3616 3617 3618 3619 3620 3621 3622 3623 3624 3625 3626 3627 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 3654 3655 3656 3657 3658 3659 3660 3661 3662 3663 3664 3665 3666 3667 3668 3669 3670 3671 3672 3673 3674 3675 3676 3677 3678 3679 3680 3681 3682 3683 3684 3685 3686 3687 3688 3689 3690 3691 3692 3693 3694 3695 3696 3697 3698 3699 3700 3701 3702 3703 3704 3705 3706 3707 3708 3709 3710 3711 3712 3713 3714 3715 3716 3717 3718 3719 3720 3721 3722 3723 3724 3725 3726 3727 3728 3729 3730 3731 3732 3733 3734 3735 3736 3737 3738 3739 3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750 3751 3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772 3773 3774 3775 3776 3777 3778 3779 3780 3781 3782 3783 3784 3785 3786 3787 3788 3789 3790 3791 3792 3793 3794 3795 3796 3797 3798 3799 3800 3801 3802 3803 3804 3805 3806 3807 3808 3809 3810 3811 3812 3813 3814 3815 3816 3817 3818 3819 3820 3821 3822 3823 3824 3825 3826 3827 3828 3829 3830 3831 3832 3833 3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844 3845 3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866 3867 3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883 3884 3885 3886 3887 3888 3889 3890 3891 3892 3893 3894 3895 3896 3897 3898 3899 3900 3901 3902 3903 3904 3905 3906 3907 3908 3909 3910 3911 3912 3913 3914 3915 3916 3917 3918 3919 3920 3921 3922 3923 3924 3925 3926 3927 3928 3929 3930 3931 3932 3933 3934 3935 3936 3937 3938 3939 3940 3941 3942 3943 3944 3945 3946 3947 3948 3949 3950 3951 3952 3953 3954 3955 3956 3957 3958 3959 3960 3961 3962 3963 3964 3965 3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981 3982 3983 3984 3985 3986 3987 3988 3989 3990 3991 3992 3993 3994 3995 3996 3997 3998 3999 4000 4001 4002 4003 4004 4005 4006 4007 4008 4009 4010 4011 4012 4013 4014 4015 4016 4017 4018 4019 4020 4021 4022 4023 4024 4025 4026 4027 4028 4029 4030 4031 4032 4033 4034 4035 4036 4037 4038 4039 4040 4041 4042 4043 4044 4045 4046 4047 4048 4049 4050 4051 4052 4053 4054 4055 4056 4057 4058 4059 4060 4061 4062 4063 4064 4065 4066 4067 4068 4069 4070 4071 4072 4073 4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088 4089 4090 4091 4092 4093 4094 4095 4096 4097 4098 4099 4100 4101 4102 4103 4104 4105 4106 4107 4108 4109 4110 4111 4112 4113 4114 4115 4116 4117 4118 4119 4120 4121 4122 4123 4124 4125 4126 4127 4128 4129 4130 4131 4132 4133 4134 4135 4136 4137 4138 4139 4140 4141 4142 4143 4144 4145 4146 4147 4148 4149 4150 4151 4152 4153 4154 4155 4156 4157 4158 4159 4160 4161 4162 4163 4164 4165 4166 4167 4168 4169 4170 4171 4172 4173 4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188 4189 4190 4191 4192 4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225 4226 4227 4228 4229 4230 4231 4232 4233 4234 4235 4236 4237 4238 4239 4240 4241 4242 4243 4244 4245 4246 4247 4248 4249 4250 4251 4252 4253 4254 4255 4256 4257 4258 4259 4260 4261 4262 4263 4264 4265 4266 4267 4268 4269 4270 4271 4272 4273 4274 4275 4276 4277 4278 4279 4280 4281 4282 4283 4284 4285 4286 4287 4288 4289 4290 4291 4292 4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 | # Copyright (C) 2005-2014, 2016 Canonical Ltd
# Authors: Robert Collins <robert.collins@canonical.com>
# and others
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""Configuration that affects the behaviour of Breezy.
Currently this configuration resides in ~/.config/breezy/breezy.conf
and ~/.config/breezy/locations.conf, which is written to by brz.
If the first location doesn't exist, then brz falls back to reading
Bazaar configuration files in ~/.bazaar or ~/.config/bazaar.
In breezy.conf the following options may be set:
[DEFAULT]
editor=name-of-program
email=Your Name <your@email.address>
check_signatures=require|ignore|check-available(default)
create_signatures=always|never|when-required(default)
log_format=name-of-format
validate_signatures_in_log=true|false(default)
acceptable_keys=pattern1,pattern2
gpg_signing_key=amy@example.com
in locations.conf, you specify the url of a branch and options for it.
Wildcards may be used - * and ? as normal in shell completion. Options
set in both breezy.conf and locations.conf are overridden by the locations.conf
setting.
[/home/robertc/source]
recurse=False|True(default)
email= as above
check_signatures= as above
create_signatures= as above.
validate_signatures_in_log=as above
acceptable_keys=as above
explanation of options
----------------------
editor - this option sets the pop up editor to use during commits.
email - this option sets the user id brz will use when committing.
check_signatures - this option will control whether brz will require good gpg
signatures, ignore them, or check them if they are
present. Currently it is unused except that check_signatures
turns on create_signatures.
create_signatures - this option controls whether brz will always create
gpg signatures or not on commits. There is an unused
option which in future is expected to work if
branch settings require signatures.
log_format - this option sets the default log format. Possible values are
long, short, line, or a plugin can register new formats.
validate_signatures_in_log - show GPG signature validity in log output
acceptable_keys - comma separated list of key patterns acceptable for
verify-signatures command
In breezy.conf you can also define aliases in the ALIASES sections, example
[ALIASES]
lastlog=log --line -r-10..-1
ll=log --line -r-10..-1
h=help
up=pull
"""
from __future__ import absolute_import
import os
import sys
import configobj
import breezy
from .lazy_import import lazy_import
lazy_import(globals(), """
import base64
import errno
import fnmatch
import re
import stat
from breezy import (
atomicfile,
controldir,
debug,
directory_service,
lazy_regex,
library_state,
lock,
lockdir,
mergetools,
osutils,
trace,
transport,
ui,
urlutils,
win32utils,
)
from breezy.i18n import gettext
""")
from . import (
commands,
errors,
hooks,
lazy_regex,
registry,
)
from .sixish import (
binary_type,
BytesIO,
PY3,
text_type,
string_types,
)
CHECK_IF_POSSIBLE=0
CHECK_ALWAYS=1
CHECK_NEVER=2
SIGN_WHEN_REQUIRED=0
SIGN_ALWAYS=1
SIGN_NEVER=2
POLICY_NONE = 0
POLICY_NORECURSE = 1
POLICY_APPENDPATH = 2
_policy_name = {
POLICY_NONE: None,
POLICY_NORECURSE: 'norecurse',
POLICY_APPENDPATH: 'appendpath',
}
_policy_value = {
None: POLICY_NONE,
'none': POLICY_NONE,
'norecurse': POLICY_NORECURSE,
'appendpath': POLICY_APPENDPATH,
}
STORE_LOCATION = POLICY_NONE
STORE_LOCATION_NORECURSE = POLICY_NORECURSE
STORE_LOCATION_APPENDPATH = POLICY_APPENDPATH
STORE_BRANCH = 3
STORE_GLOBAL = 4
class OptionExpansionLoop(errors.BzrError):
_fmt = 'Loop involving %(refs)r while expanding "%(string)s".'
def __init__(self, string, refs):
self.string = string
self.refs = '->'.join(refs)
class ExpandingUnknownOption(errors.BzrError):
_fmt = 'Option "%(name)s" is not defined while expanding "%(string)s".'
def __init__(self, name, string):
self.name = name
self.string = string
class IllegalOptionName(errors.BzrError):
_fmt = 'Option "%(name)s" is not allowed.'
def __init__(self, name):
self.name = name
class ConfigContentError(errors.BzrError):
_fmt = "Config file %(filename)s is not UTF-8 encoded\n"
def __init__(self, filename):
self.filename = filename
class ParseConfigError(errors.BzrError):
_fmt = "Error(s) parsing config file %(filename)s:\n%(errors)s"
def __init__(self, errors, filename):
self.filename = filename
self.errors = '\n'.join(e.msg for e in errors)
class ConfigOptionValueError(errors.BzrError):
_fmt = ('Bad value "%(value)s" for option "%(name)s".\n'
'See ``brz help %(name)s``')
def __init__(self, name, value):
errors.BzrError.__init__(self, name=name, value=value)
class NoEmailInUsername(errors.BzrError):
_fmt = "%(username)r does not seem to contain a reasonable email address"
def __init__(self, username):
self.username = username
class NoSuchConfig(errors.BzrError):
_fmt = ('The "%(config_id)s" configuration does not exist.')
def __init__(self, config_id):
errors.BzrError.__init__(self, config_id=config_id)
class NoSuchConfigOption(errors.BzrError):
_fmt = ('The "%(option_name)s" configuration option does not exist.')
def __init__(self, option_name):
errors.BzrError.__init__(self, option_name=option_name)
class NoWhoami(errors.BzrError):
_fmt = ('Unable to determine your name.\n'
"Please, set your name with the 'whoami' command.\n"
'E.g. brz whoami "Your Name <name@example.com>"')
def signature_policy_from_unicode(signature_string):
"""Convert a string to a signing policy."""
if signature_string.lower() == 'check-available':
return CHECK_IF_POSSIBLE
if signature_string.lower() == 'ignore':
return CHECK_NEVER
if signature_string.lower() == 'require':
return CHECK_ALWAYS
raise ValueError("Invalid signatures policy '%s'"
% signature_string)
def signing_policy_from_unicode(signature_string):
"""Convert a string to a signing policy."""
if signature_string.lower() == 'when-required':
return SIGN_WHEN_REQUIRED
if signature_string.lower() == 'never':
return SIGN_NEVER
if signature_string.lower() == 'always':
return SIGN_ALWAYS
raise ValueError("Invalid signing policy '%s'"
% signature_string)
def _has_decode_bug():
"""True if configobj will fail to decode to unicode on Python 2."""
if sys.version_info > (3,):
return False
conf = configobj.ConfigObj()
decode = getattr(conf, "_decode", None)
if decode:
result = decode(b"\xc2\xa7", "utf-8")
if isinstance(result[0], str):
return True
return False
def _has_triplequote_bug():
"""True if triple quote logic is reversed, see lp:710410."""
conf = configobj.ConfigObj()
quote = getattr(conf, "_get_triple_quote", None)
if quote and quote('"""') != "'''":
return True
return False
class ConfigObj(configobj.ConfigObj):
def __init__(self, infile=None, **kwargs):
# We define our own interpolation mechanism calling it option expansion
super(ConfigObj, self).__init__(infile=infile,
interpolation=False,
**kwargs)
if _has_decode_bug():
def _decode(self, infile, encoding):
if isinstance(infile, str) and encoding:
return infile.decode(encoding).splitlines(True)
return super(ConfigObj, self)._decode(infile, encoding)
if _has_triplequote_bug():
def _get_triple_quote(self, value):
quot = super(ConfigObj, self)._get_triple_quote(value)
if quot == configobj.tdquot:
return configobj.tsquot
return configobj.tdquot
def get_bool(self, section, key):
return self[section].as_bool(key)
def get_value(self, section, name):
# Try [] for the old DEFAULT section.
if section == "DEFAULT":
try:
return self[name]
except KeyError:
pass
return self[section][name]
class Config(object):
"""A configuration policy - what username, editor, gpg needs etc."""
def __init__(self):
super(Config, self).__init__()
def config_id(self):
"""Returns a unique ID for the config."""
raise NotImplementedError(self.config_id)
def get_change_editor(self, old_tree, new_tree):
from breezy import diff
cmd = self._get_change_editor()
if cmd is None:
return None
return diff.DiffFromTool.from_string(cmd, old_tree, new_tree,
sys.stdout)
def _get_signature_checking(self):
"""Template method to override signature checking policy."""
def _get_signing_policy(self):
"""Template method to override signature creation policy."""
option_ref_re = None
def expand_options(self, string, env=None):
"""Expand option references in the string in the configuration context.
:param string: The string containing option to expand.
:param env: An option dict defining additional configuration options or
overriding existing ones.
:returns: The expanded string.
"""
return self._expand_options_in_string(string, env)
def _expand_options_in_list(self, slist, env=None, _ref_stack=None):
"""Expand options in a list of strings in the configuration context.
:param slist: A list of strings.
:param env: An option dict defining additional configuration options or
overriding existing ones.
:param _ref_stack: Private list containing the options being
expanded to detect loops.
:returns: The flatten list of expanded strings.
"""
# expand options in each value separately flattening lists
result = []
for s in slist:
value = self._expand_options_in_string(s, env, _ref_stack)
if isinstance(value, list):
result.extend(value)
else:
result.append(value)
return result
def _expand_options_in_string(self, string, env=None, _ref_stack=None):
"""Expand options in the string in the configuration context.
:param string: The string to be expanded.
:param env: An option dict defining additional configuration options or
overriding existing ones.
:param _ref_stack: Private list containing the options being
expanded to detect loops.
:returns: The expanded string.
"""
if string is None:
# Not much to expand there
return None
if _ref_stack is None:
# What references are currently resolved (to detect loops)
_ref_stack = []
if self.option_ref_re is None:
# We want to match the most embedded reference first (i.e. for
# '{{foo}}' we will get '{foo}',
# for '{bar{baz}}' we will get '{baz}'
self.option_ref_re = re.compile('({[^{}]+})')
result = string
# We need to iterate until no more refs appear ({{foo}} will need two
# iterations for example).
while True:
raw_chunks = self.option_ref_re.split(result)
if len(raw_chunks) == 1:
# Shorcut the trivial case: no refs
return result
chunks = []
list_value = False
# Split will isolate refs so that every other chunk is a ref
chunk_is_ref = False
for chunk in raw_chunks:
if not chunk_is_ref:
if chunk:
# Keep only non-empty strings (or we get bogus empty
# slots when a list value is involved).
chunks.append(chunk)
chunk_is_ref = True
else:
name = chunk[1:-1]
if name in _ref_stack:
raise OptionExpansionLoop(string, _ref_stack)
_ref_stack.append(name)
value = self._expand_option(name, env, _ref_stack)
if value is None:
raise ExpandingUnknownOption(name, string)
if isinstance(value, list):
list_value = True
chunks.extend(value)
else:
chunks.append(value)
_ref_stack.pop()
chunk_is_ref = False
if list_value:
# Once a list appears as the result of an expansion, all
# callers will get a list result. This allows a consistent
# behavior even when some options in the expansion chain
# defined as strings (no comma in their value) but their
# expanded value is a list.
return self._expand_options_in_list(chunks, env, _ref_stack)
else:
result = ''.join(chunks)
return result
def _expand_option(self, name, env, _ref_stack):
if env is not None and name in env:
# Special case, values provided in env takes precedence over
# anything else
value = env[name]
else:
# FIXME: This is a limited implementation, what we really need is a
# way to query the brz config for the value of an option,
# respecting the scope rules (That is, once we implement fallback
# configs, getting the option value should restart from the top
# config, not the current one) -- vila 20101222
value = self.get_user_option(name, expand=False)
if isinstance(value, list):
value = self._expand_options_in_list(value, env, _ref_stack)
else:
value = self._expand_options_in_string(value, env, _ref_stack)
return value
def _get_user_option(self, option_name):
"""Template method to provide a user option."""
return None
def get_user_option(self, option_name, expand=True):
"""Get a generic option - no special process, no default.
:param option_name: The queried option.
:param expand: Whether options references should be expanded.
:returns: The value of the option.
"""
value = self._get_user_option(option_name)
if expand:
if isinstance(value, list):
value = self._expand_options_in_list(value)
elif isinstance(value, dict):
trace.warning('Cannot expand "%s":'
' Dicts do not support option expansion'
% (option_name,))
else:
value = self._expand_options_in_string(value)
for hook in OldConfigHooks['get']:
hook(self, option_name, value)
return value
def get_user_option_as_bool(self, option_name, expand=None, default=None):
"""Get a generic option as a boolean.
:param expand: Allow expanding references to other config values.
:param default: Default value if nothing is configured
:return None if the option doesn't exist or its value can't be
interpreted as a boolean. Returns True or False otherwise.
"""
s = self.get_user_option(option_name, expand=expand)
if s is None:
# The option doesn't exist
return default
val = ui.bool_from_string(s)
if val is None:
# The value can't be interpreted as a boolean
trace.warning('Value "%s" is not a boolean for "%s"',
s, option_name)
return val
def get_user_option_as_list(self, option_name, expand=None):
"""Get a generic option as a list - no special process, no default.
:return None if the option doesn't exist. Returns the value as a list
otherwise.
"""
l = self.get_user_option(option_name, expand=expand)
if isinstance(l, string_types):
# A single value, most probably the user forgot (or didn't care to
# add) the final ','
l = [l]
return l
def _log_format(self):
"""See log_format()."""
return None
def validate_signatures_in_log(self):
"""Show GPG signature validity in log"""
result = self._validate_signatures_in_log()
if result == "true":
result = True
else:
result = False
return result
def _validate_signatures_in_log(self):
"""See validate_signatures_in_log()."""
return None
def _post_commit(self):
"""See Config.post_commit."""
return None
def user_email(self):
"""Return just the email component of a username."""
return extract_email_address(self.username())
def username(self):
"""Return email-style username.
Something similar to 'Martin Pool <mbp@sourcefrog.net>'
$BRZ_EMAIL can be set to override this, then
the concrete policy type is checked, and finally
$EMAIL is examined.
If no username can be found, NoWhoami exception is raised.
"""
v = os.environ.get('BRZ_EMAIL')
if v:
if not PY3:
v = v.decode(osutils.get_user_encoding())
return v
v = self._get_user_id()
if v:
return v
return default_email()
def ensure_username(self):
"""Raise NoWhoami if username is not set.
This method relies on the username() function raising the error.
"""
self.username()
def get_alias(self, value):
return self._get_alias(value)
def _get_alias(self, value):
pass
def get_nickname(self):
return self._get_nickname()
def _get_nickname(self):
return None
def get_bzr_remote_path(self):
try:
return os.environ['BZR_REMOTE_PATH']
except KeyError:
path = self.get_user_option("bzr_remote_path")
if path is None:
path = 'bzr'
return path
def suppress_warning(self, warning):
"""Should the warning be suppressed or emitted.
:param warning: The name of the warning being tested.
:returns: True if the warning should be suppressed, False otherwise.
"""
warnings = self.get_user_option_as_list('suppress_warnings')
if warnings is None or warning not in warnings:
return False
else:
return True
def get_merge_tools(self):
tools = {}
for (oname, value, section, conf_id, parser) in self._get_options():
if oname.startswith('bzr.mergetool.'):
tool_name = oname[len('bzr.mergetool.'):]
tools[tool_name] = self.get_user_option(oname, False)
trace.mutter('loaded merge tools: %r' % tools)
return tools
def find_merge_tool(self, name):
# We fake a defaults mechanism here by checking if the given name can
# be found in the known_merge_tools if it's not found in the config.
# This should be done through the proposed config defaults mechanism
# when it becomes available in the future.
command_line = (self.get_user_option('bzr.mergetool.%s' % name,
expand=False)
or mergetools.known_merge_tools.get(name, None))
return command_line
class _ConfigHooks(hooks.Hooks):
"""A dict mapping hook names and a list of callables for configs.
"""
def __init__(self):
"""Create the default hooks.
These are all empty initially, because by default nothing should get
notified.
"""
super(_ConfigHooks, self).__init__('breezy.config', 'ConfigHooks')
self.add_hook('load',
'Invoked when a config store is loaded.'
' The signature is (store).',
(2, 4))
self.add_hook('save',
'Invoked when a config store is saved.'
' The signature is (store).',
(2, 4))
# The hooks for config options
self.add_hook('get',
'Invoked when a config option is read.'
' The signature is (stack, name, value).',
(2, 4))
self.add_hook('set',
'Invoked when a config option is set.'
' The signature is (stack, name, value).',
(2, 4))
self.add_hook('remove',
'Invoked when a config option is removed.'
' The signature is (stack, name).',
(2, 4))
ConfigHooks = _ConfigHooks()
class _OldConfigHooks(hooks.Hooks):
"""A dict mapping hook names and a list of callables for configs.
"""
def __init__(self):
"""Create the default hooks.
These are all empty initially, because by default nothing should get
notified.
"""
super(_OldConfigHooks, self).__init__('breezy.config', 'OldConfigHooks')
self.add_hook('load',
'Invoked when a config store is loaded.'
' The signature is (config).',
(2, 4))
self.add_hook('save',
'Invoked when a config store is saved.'
' The signature is (config).',
(2, 4))
# The hooks for config options
self.add_hook('get',
'Invoked when a config option is read.'
' The signature is (config, name, value).',
(2, 4))
self.add_hook('set',
'Invoked when a config option is set.'
' The signature is (config, name, value).',
(2, 4))
self.add_hook('remove',
'Invoked when a config option is removed.'
' The signature is (config, name).',
(2, 4))
OldConfigHooks = _OldConfigHooks()
class IniBasedConfig(Config):
"""A configuration policy that draws from ini files."""
def __init__(self, file_name=None):
"""Base class for configuration files using an ini-like syntax.
:param file_name: The configuration file path.
"""
super(IniBasedConfig, self).__init__()
self.file_name = file_name
self.file_name = file_name
self._content = None
self._parser = None
@classmethod
def from_string(cls, str_or_unicode, file_name=None, save=False):
"""Create a config object from a string.
:param str_or_unicode: A string representing the file content. This will
be utf-8 encoded.
:param file_name: The configuration file path.
:param _save: Whether the file should be saved upon creation.
"""
conf = cls(file_name=file_name)
conf._create_from_string(str_or_unicode, save)
return conf
def _create_from_string(self, str_or_unicode, save):
self._content = BytesIO(str_or_unicode.encode('utf-8'))
# Some tests use in-memory configs, some other always need the config
# file to exist on disk.
if save:
self._write_config_file()
def _get_parser(self):
if self._parser is not None:
return self._parser
if self._content is not None:
co_input = self._content
elif self.file_name is None:
raise AssertionError('We have no content to create the config')
else:
co_input = self.file_name
try:
self._parser = ConfigObj(co_input, encoding='utf-8')
except configobj.ConfigObjError as e:
raise ParseConfigError(e.errors, e.config.filename)
except UnicodeDecodeError:
raise ConfigContentError(self.file_name)
# Make sure self.reload() will use the right file name
self._parser.filename = self.file_name
for hook in OldConfigHooks['load']:
hook(self)
return self._parser
def reload(self):
"""Reload the config file from disk."""
if self.file_name is None:
raise AssertionError('We need a file name to reload the config')
if self._parser is not None:
self._parser.reload()
for hook in ConfigHooks['load']:
hook(self)
def _get_matching_sections(self):
"""Return an ordered list of (section_name, extra_path) pairs.
If the section contains inherited configuration, extra_path is
a string containing the additional path components.
"""
section = self._get_section()
if section is not None:
return [(section, '')]
else:
return []
def _get_section(self):
"""Override this to define the section used by the config."""
return "DEFAULT"
def _get_sections(self, name=None):
"""Returns an iterator of the sections specified by ``name``.
:param name: The section name. If None is supplied, the default
configurations are yielded.
:return: A tuple (name, section, config_id) for all sections that will
be walked by user_get_option() in the 'right' order. The first one
is where set_user_option() will update the value.
"""
parser = self._get_parser()
if name is not None:
yield (name, parser[name], self.config_id())
else:
# No section name has been given so we fallback to the configobj
# itself which holds the variables defined outside of any section.
yield (None, parser, self.config_id())
def _get_options(self, sections=None):
"""Return an ordered list of (name, value, section, config_id) tuples.
All options are returned with their associated value and the section
they appeared in. ``config_id`` is a unique identifier for the
configuration file the option is defined in.
:param sections: Default to ``_get_matching_sections`` if not
specified. This gives a better control to daughter classes about
which sections should be searched. This is a list of (name,
configobj) tuples.
"""
opts = []
if sections is None:
parser = self._get_parser()
sections = []
for (section_name, _) in self._get_matching_sections():
try:
section = parser[section_name]
except KeyError:
# This could happen for an empty file for which we define a
# DEFAULT section. FIXME: Force callers to provide sections
# instead ? -- vila 20100930
continue
sections.append((section_name, section))
config_id = self.config_id()
for (section_name, section) in sections:
for (name, value) in section.iteritems():
yield (name, parser._quote(value), section_name,
config_id, parser)
def _get_option_policy(self, section, option_name):
"""Return the policy for the given (section, option_name) pair."""
return POLICY_NONE
def _get_change_editor(self):
return self.get_user_option('change_editor')
def _get_signature_checking(self):
"""See Config._get_signature_checking."""
policy = self._get_user_option('check_signatures')
if policy:
return signature_policy_from_unicode(policy)
def _get_signing_policy(self):
"""See Config._get_signing_policy"""
policy = self._get_user_option('create_signatures')
if policy:
return signing_policy_from_unicode(policy)
def _get_user_id(self):
"""Get the user id from the 'email' key in the current section."""
return self._get_user_option('email')
def _get_user_option(self, option_name):
"""See Config._get_user_option."""
for (section, extra_path) in self._get_matching_sections():
try:
value = self._get_parser().get_value(section, option_name)
except KeyError:
continue
policy = self._get_option_policy(section, option_name)
if policy == POLICY_NONE:
return value
elif policy == POLICY_NORECURSE:
# norecurse items only apply to the exact path
if extra_path:
continue
else:
return value
elif policy == POLICY_APPENDPATH:
if extra_path:
value = urlutils.join(value, extra_path)
return value
else:
raise AssertionError('Unexpected config policy %r' % policy)
else:
return None
def _log_format(self):
"""See Config.log_format."""
return self._get_user_option('log_format')
def _validate_signatures_in_log(self):
"""See Config.validate_signatures_in_log."""
return self._get_user_option('validate_signatures_in_log')
def _acceptable_keys(self):
"""See Config.acceptable_keys."""
return self._get_user_option('acceptable_keys')
def _post_commit(self):
"""See Config.post_commit."""
return self._get_user_option('post_commit')
def _get_alias(self, value):
try:
return self._get_parser().get_value("ALIASES",
value)
except KeyError:
pass
def _get_nickname(self):
return self.get_user_option('nickname')
def remove_user_option(self, option_name, section_name=None):
"""Remove a user option and save the configuration file.
:param option_name: The option to be removed.
:param section_name: The section the option is defined in, default to
the default section.
"""
self.reload()
parser = self._get_parser()
if section_name is None:
section = parser
else:
section = parser[section_name]
try:
del section[option_name]
except KeyError:
raise NoSuchConfigOption(option_name)
self._write_config_file()
for hook in OldConfigHooks['remove']:
hook(self, option_name)
def _write_config_file(self):
if self.file_name is None:
raise AssertionError('We cannot save, self.file_name is None')
conf_dir = os.path.dirname(self.file_name)
ensure_config_dir_exists(conf_dir)
atomic_file = atomicfile.AtomicFile(self.file_name)
self._get_parser().write(atomic_file)
atomic_file.commit()
atomic_file.close()
osutils.copy_ownership_from_path(self.file_name)
for hook in OldConfigHooks['save']:
hook(self)
class LockableConfig(IniBasedConfig):
"""A configuration needing explicit locking for access.
If several processes try to write the config file, the accesses need to be
serialized.
Daughter classes should use the self.lock_write() decorator method when they
upate a config (they call, directly or indirectly, the
``_write_config_file()`` method. These methods (typically ``set_option()``
and variants must reload the config file from disk before calling
``_write_config_file()``), this can be achieved by calling the
``self.reload()`` method. Note that the lock scope should cover both the
reading and the writing of the config file which is why the decorator can't
be applied to ``_write_config_file()`` only.
This should be enough to implement the following logic:
- lock for exclusive write access,
- reload the config file from disk,
- set the new value
- unlock
This logic guarantees that a writer can update a value without erasing an
update made by another writer.
"""
lock_name = 'lock'
def __init__(self, file_name):
super(LockableConfig, self).__init__(file_name=file_name)
self.dir = osutils.dirname(osutils.safe_unicode(self.file_name))
# FIXME: It doesn't matter that we don't provide possible_transports
# below since this is currently used only for local config files ;
# local transports are not shared. But if/when we start using
# LockableConfig for other kind of transports, we will need to reuse
# whatever connection is already established -- vila 20100929
self.transport = transport.get_transport_from_path(self.dir)
self._lock = lockdir.LockDir(self.transport, self.lock_name)
def _create_from_string(self, unicode_bytes, save):
super(LockableConfig, self)._create_from_string(unicode_bytes, False)
if save:
# We need to handle the saving here (as opposed to IniBasedConfig)
# to be able to lock
self.lock_write()
self._write_config_file()
self.unlock()
def lock_write(self, token=None):
"""Takes a write lock in the directory containing the config file.
If the directory doesn't exist it is created.
"""
ensure_config_dir_exists(self.dir)
token = self._lock.lock_write(token)
return lock.LogicalLockResult(self.unlock, token)
def unlock(self):
self._lock.unlock()
def break_lock(self):
self._lock.break_lock()
def remove_user_option(self, option_name, section_name=None):
with self.lock_write():
super(LockableConfig, self).remove_user_option(
option_name, section_name)
def _write_config_file(self):
if self._lock is None or not self._lock.is_held:
# NB: if the following exception is raised it probably means a
# missing call to lock_write() by one of the callers.
raise errors.ObjectNotLocked(self)
super(LockableConfig, self)._write_config_file()
class GlobalConfig(LockableConfig):
"""The configuration that should be used for a specific location."""
def __init__(self):
super(GlobalConfig, self).__init__(file_name=config_filename())
def config_id(self):
return 'breezy'
@classmethod
def from_string(cls, str_or_unicode, save=False):
"""Create a config object from a string.
:param str_or_unicode: A string representing the file content. This
will be utf-8 encoded.
:param save: Whether the file should be saved upon creation.
"""
conf = cls()
conf._create_from_string(str_or_unicode, save)
return conf
def set_user_option(self, option, value):
"""Save option and its value in the configuration."""
with self.lock_write():
self._set_option(option, value, 'DEFAULT')
def get_aliases(self):
"""Return the aliases section."""
if 'ALIASES' in self._get_parser():
return self._get_parser()['ALIASES']
else:
return {}
def set_alias(self, alias_name, alias_command):
"""Save the alias in the configuration."""
with self.lock_write():
self._set_option(alias_name, alias_command, 'ALIASES')
def unset_alias(self, alias_name):
"""Unset an existing alias."""
with self.lock_write():
self.reload()
aliases = self._get_parser().get('ALIASES')
if not aliases or alias_name not in aliases:
raise errors.NoSuchAlias(alias_name)
del aliases[alias_name]
self._write_config_file()
def _set_option(self, option, value, section):
self.reload()
self._get_parser().setdefault(section, {})[option] = value
self._write_config_file()
for hook in OldConfigHooks['set']:
hook(self, option, value)
def _get_sections(self, name=None):
"""See IniBasedConfig._get_sections()."""
parser = self._get_parser()
# We don't give access to options defined outside of any section, we
# used the DEFAULT section by... default.
if name in (None, 'DEFAULT'):
# This could happen for an empty file where the DEFAULT section
# doesn't exist yet. So we force DEFAULT when yielding
name = 'DEFAULT'
if 'DEFAULT' not in parser:
parser['DEFAULT'] = {}
yield (name, parser[name], self.config_id())
def remove_user_option(self, option_name, section_name=None):
if section_name is None:
# We need to force the default section.
section_name = 'DEFAULT'
with self.lock_write():
# We need to avoid the LockableConfig implementation or we'll lock
# twice
super(LockableConfig, self).remove_user_option(
option_name, section_name)
def _iter_for_location_by_parts(sections, location):
"""Keep only the sessions matching the specified location.
:param sections: An iterable of section names.
:param location: An url or a local path to match against.
:returns: An iterator of (section, extra_path, nb_parts) where nb is the
number of path components in the section name, section is the section
name and extra_path is the difference between location and the section
name.
``location`` will always be a local path and never a 'file://' url but the
section names themselves can be in either form.
"""
location_parts = location.rstrip('/').split('/')
for section in sections:
# location is a local path if possible, so we need to convert 'file://'
# urls in section names to local paths if necessary.
# This also avoids having file:///path be a more exact
# match than '/path'.
# FIXME: This still raises an issue if a user defines both file:///path
# *and* /path. Should we raise an error in this case -- vila 20110505
if section.startswith('file://'):
section_path = urlutils.local_path_from_url(section)
else:
section_path = section
section_parts = section_path.rstrip('/').split('/')
matched = True
if len(section_parts) > len(location_parts):
# More path components in the section, they can't match
matched = False
else:
# Rely on zip truncating in length to the length of the shortest
# argument sequence.
for name in zip(location_parts, section_parts):
if not fnmatch.fnmatch(name[0], name[1]):
matched = False
break
if not matched:
continue
# build the path difference between the section and the location
extra_path = '/'.join(location_parts[len(section_parts):])
yield section, extra_path, len(section_parts)
class LocationConfig(LockableConfig):
"""A configuration object that gives the policy for a location."""
def __init__(self, location):
super(LocationConfig, self).__init__(
file_name=locations_config_filename())
# local file locations are looked up by local path, rather than
# by file url. This is because the config file is a user
# file, and we would rather not expose the user to file urls.
if location.startswith('file://'):
location = urlutils.local_path_from_url(location)
self.location = location
def config_id(self):
return 'locations'
@classmethod
def from_string(cls, str_or_unicode, location, save=False):
"""Create a config object from a string.
:param str_or_unicode: A string representing the file content. This will
be utf-8 encoded.
:param location: The location url to filter the configuration.
:param save: Whether the file should be saved upon creation.
"""
conf = cls(location)
conf._create_from_string(str_or_unicode, save)
return conf
def _get_matching_sections(self):
"""Return an ordered list of section names matching this location."""
# put the longest (aka more specific) locations first
matches = sorted(
_iter_for_location_by_parts(self._get_parser(), self.location),
key=lambda match: (match[2], match[0]),
reverse=True)
for (section, extra_path, length) in matches:
yield section, extra_path
# should we stop looking for parent configs here?
try:
if self._get_parser()[section].as_bool('ignore_parents'):
break
except KeyError:
pass
def _get_sections(self, name=None):
"""See IniBasedConfig._get_sections()."""
# We ignore the name here as the only sections handled are named with
# the location path and we don't expose embedded sections either.
parser = self._get_parser()
for name, extra_path in self._get_matching_sections():
yield (name, parser[name], self.config_id())
def _get_option_policy(self, section, option_name):
"""Return the policy for the given (section, option_name) pair."""
# check for the old 'recurse=False' flag
try:
recurse = self._get_parser()[section].as_bool('recurse')
except KeyError:
recurse = True
if not recurse:
return POLICY_NORECURSE
policy_key = option_name + ':policy'
try:
policy_name = self._get_parser()[section][policy_key]
except KeyError:
policy_name = None
return _policy_value[policy_name]
def _set_option_policy(self, section, option_name, option_policy):
"""Set the policy for the given option name in the given section."""
policy_key = option_name + ':policy'
policy_name = _policy_name[option_policy]
if policy_name is not None:
self._get_parser()[section][policy_key] = policy_name
else:
if policy_key in self._get_parser()[section]:
del self._get_parser()[section][policy_key]
def set_user_option(self, option, value, store=STORE_LOCATION):
"""Save option and its value in the configuration."""
if store not in [STORE_LOCATION,
STORE_LOCATION_NORECURSE,
STORE_LOCATION_APPENDPATH]:
raise ValueError('bad storage policy %r for %r' %
(store, option))
with self.lock_write():
self.reload()
location = self.location
if location.endswith('/'):
location = location[:-1]
parser = self._get_parser()
if location not in parser and not location + '/' in parser:
parser[location] = {}
elif location + '/' in parser:
location = location + '/'
parser[location][option] = value
# the allowed values of store match the config policies
self._set_option_policy(location, option, store)
self._write_config_file()
for hook in OldConfigHooks['set']:
hook(self, option, value)
class BranchConfig(Config):
"""A configuration object giving the policy for a branch."""
def __init__(self, branch):
super(BranchConfig, self).__init__()
self._location_config = None
self._branch_data_config = None
self._global_config = None
self.branch = branch
self.option_sources = (self._get_location_config,
self._get_branch_data_config,
self._get_global_config)
def config_id(self):
return 'branch'
def _get_branch_data_config(self):
if self._branch_data_config is None:
self._branch_data_config = TreeConfig(self.branch)
self._branch_data_config.config_id = self.config_id
return self._branch_data_config
def _get_location_config(self):
if self._location_config is None:
self._location_config = LocationConfig(self.branch.base)
return self._location_config
def _get_global_config(self):
if self._global_config is None:
self._global_config = GlobalConfig()
return self._global_config
def _get_best_value(self, option_name):
"""This returns a user option from local, tree or global config.
They are tried in that order. Use get_safe_value if trusted values
are necessary.
"""
for source in self.option_sources:
value = getattr(source(), option_name)()
if value is not None:
return value
return None
def _get_safe_value(self, option_name):
"""This variant of get_best_value never returns untrusted values.
It does not return values from the branch data, because the branch may
not be controlled by the user.
We may wish to allow locations.conf to control whether branches are
trusted in the future.
"""
for source in (self._get_location_config, self._get_global_config):
value = getattr(source(), option_name)()
if value is not None:
return value
return None
def _get_user_id(self):
"""Return the full user id for the branch.
e.g. "John Hacker <jhacker@example.com>"
This is looked up in the email controlfile for the branch.
"""
return self._get_best_value('_get_user_id')
def _get_change_editor(self):
return self._get_best_value('_get_change_editor')
def _get_signature_checking(self):
"""See Config._get_signature_checking."""
return self._get_best_value('_get_signature_checking')
def _get_signing_policy(self):
"""See Config._get_signing_policy."""
return self._get_best_value('_get_signing_policy')
def _get_user_option(self, option_name):
"""See Config._get_user_option."""
for source in self.option_sources:
value = source()._get_user_option(option_name)
if value is not None:
return value
return None
def _get_sections(self, name=None):
"""See IniBasedConfig.get_sections()."""
for source in self.option_sources:
for section in source()._get_sections(name):
yield section
def _get_options(self, sections=None):
opts = []
# First the locations options
for option in self._get_location_config()._get_options():
yield option
# Then the branch options
branch_config = self._get_branch_data_config()
if sections is None:
sections = [('DEFAULT', branch_config._get_parser())]
# FIXME: We shouldn't have to duplicate the code in IniBasedConfig but
# Config itself has no notion of sections :( -- vila 20101001
config_id = self.config_id()
for (section_name, section) in sections:
for (name, value) in section.iteritems():
yield (name, value, section_name,
config_id, branch_config._get_parser())
# Then the global options
for option in self._get_global_config()._get_options():
yield option
def set_user_option(self, name, value, store=STORE_BRANCH,
warn_masked=False):
if store == STORE_BRANCH:
self._get_branch_data_config().set_option(value, name)
elif store == STORE_GLOBAL:
self._get_global_config().set_user_option(name, value)
else:
self._get_location_config().set_user_option(name, value, store)
if not warn_masked:
return
if store in (STORE_GLOBAL, STORE_BRANCH):
mask_value = self._get_location_config().get_user_option(name)
if mask_value is not None:
trace.warning('Value "%s" is masked by "%s" from'
' locations.conf', value, mask_value)
else:
if store == STORE_GLOBAL:
branch_config = self._get_branch_data_config()
mask_value = branch_config.get_user_option(name)
if mask_value is not None:
trace.warning('Value "%s" is masked by "%s" from'
' branch.conf', value, mask_value)
def remove_user_option(self, option_name, section_name=None):
self._get_branch_data_config().remove_option(option_name, section_name)
def _post_commit(self):
"""See Config.post_commit."""
return self._get_safe_value('_post_commit')
def _get_nickname(self):
value = self._get_explicit_nickname()
if value is not None:
return value
if self.branch.name:
return self.branch.name
return urlutils.unescape(self.branch.base.split('/')[-2])
def has_explicit_nickname(self):
"""Return true if a nickname has been explicitly assigned."""
return self._get_explicit_nickname() is not None
def _get_explicit_nickname(self):
return self._get_best_value('_get_nickname')
def _log_format(self):
"""See Config.log_format."""
return self._get_best_value('_log_format')
def _validate_signatures_in_log(self):
"""See Config.validate_signatures_in_log."""
return self._get_best_value('_validate_signatures_in_log')
def _acceptable_keys(self):
"""See Config.acceptable_keys."""
return self._get_best_value('_acceptable_keys')
def ensure_config_dir_exists(path=None):
"""Make sure a configuration directory exists.
This makes sure that the directory exists.
On windows, since configuration directories are 2 levels deep,
it makes sure both the directory and the parent directory exists.
"""
if path is None:
path = config_dir()
if not os.path.isdir(path):
if sys.platform == 'win32':
parent_dir = os.path.dirname(path)
if not os.path.isdir(parent_dir):
trace.mutter('creating config parent directory: %r', parent_dir)
os.mkdir(parent_dir)
trace.mutter('creating config directory: %r', path)
os.mkdir(path)
osutils.copy_ownership_from_path(path)
def bazaar_config_dir():
"""Return per-user configuration directory as unicode string
By default this is %APPDATA%/bazaar/2.0 on Windows, ~/.bazaar on Mac OS X
and Linux. On Mac OS X and Linux, if there is a $XDG_CONFIG_HOME/bazaar directory,
that will be used instead.
TODO: Global option --config-dir to override this.
"""
base = osutils.path_from_environ('BZR_HOME')
if sys.platform == 'win32':
if base is None:
base = win32utils.get_appdata_location()
if base is None:
base = win32utils.get_home_location()
return osutils.pathjoin(base, 'bazaar', '2.0')
if base is None:
xdg_dir = osutils.path_from_environ('XDG_CONFIG_HOME')
if xdg_dir is None:
xdg_dir = osutils.pathjoin(osutils._get_home_dir(), ".config")
xdg_dir = osutils.pathjoin(xdg_dir, 'bazaar')
if osutils.isdir(xdg_dir):
trace.mutter(
"Using configuration in XDG directory %s." % xdg_dir)
return xdg_dir
base = osutils._get_home_dir()
return osutils.pathjoin(base, ".bazaar")
def _config_dir():
"""Return per-user configuration directory as unicode string
By default this is %APPDATA%/breezy on Windows, $XDG_CONFIG_HOME/breezy on
Mac OS X and Linux. If the breezy config directory doesn't exist but
the bazaar one (see bazaar_config_dir()) does, use that instead.
"""
# TODO: Global option --config-dir to override this.
base = osutils.path_from_environ('BRZ_HOME')
if sys.platform == 'win32':
if base is None:
base = win32utils.get_appdata_location()
if base is None:
base = win32utils.get_home_location()
if base is None:
base = osutils.path_from_environ('XDG_CONFIG_HOME')
if base is None:
base = osutils.pathjoin(osutils._get_home_dir(), ".config")
breezy_dir = osutils.pathjoin(base, 'breezy')
if osutils.isdir(breezy_dir):
return (breezy_dir, 'breezy')
# If the breezy directory doesn't exist, but the bazaar one does, use that:
bazaar_dir = bazaar_config_dir()
if osutils.isdir(bazaar_dir):
trace.mutter(
"Using Bazaar configuration directory (%s)", bazaar_dir)
return (bazaar_dir, 'bazaar')
return (breezy_dir, 'breezy')
def config_dir():
"""Return per-user configuration directory as unicode string
By default this is %APPDATA%/breezy on Windows, $XDG_CONFIG_HOME/breezy on
Mac OS X and Linux. If the breezy config directory doesn't exist but
the bazaar one (see bazaar_config_dir()) does, use that instead.
"""
return _config_dir()[0]
def config_filename():
"""Return per-user configuration ini file filename."""
path, kind = _config_dir()
if kind == 'bazaar':
return osutils.pathjoin(path, 'bazaar.conf')
else:
return osutils.pathjoin(path, 'breezy.conf')
def locations_config_filename():
"""Return per-user configuration ini file filename."""
return osutils.pathjoin(config_dir(), 'locations.conf')
def authentication_config_filename():
"""Return per-user authentication ini file filename."""
return osutils.pathjoin(config_dir(), 'authentication.conf')
def user_ignore_config_filename():
"""Return the user default ignore filename"""
return osutils.pathjoin(config_dir(), 'ignore')
def crash_dir():
"""Return the directory name to store crash files.
This doesn't implicitly create it.
On Windows it's in the config directory; elsewhere it's /var/crash
which may be monitored by apport. It can be overridden by
$APPORT_CRASH_DIR.
"""
if sys.platform == 'win32':
return osutils.pathjoin(config_dir(), 'Crash')
else:
# XXX: hardcoded in apport_python_hook.py; therefore here too -- mbp
# 2010-01-31
return os.environ.get('APPORT_CRASH_DIR', '/var/crash')
def xdg_cache_dir():
# See http://standards.freedesktop.org/basedir-spec/latest/ar01s03.html
# Possibly this should be different on Windows?
e = os.environ.get('XDG_CACHE_HOME', None)
if e:
return e
else:
return os.path.expanduser('~/.cache')
def _get_default_mail_domain(mailname_file='/etc/mailname'):
"""If possible, return the assumed default email domain.
:returns: string mail domain, or None.
"""
if sys.platform == 'win32':
# No implementation yet; patches welcome
return None
try:
f = open(mailname_file)
except (IOError, OSError) as e:
return None
try:
domain = f.readline().strip()
return domain
finally:
f.close()
def default_email():
v = os.environ.get('BRZ_EMAIL')
if v:
if not PY3:
v = v.decode(osutils.get_user_encoding())
return v
v = os.environ.get('EMAIL')
if v:
if not PY3:
v = v.decode(osutils.get_user_encoding())
return v
name, email = _auto_user_id()
if name and email:
return u'%s <%s>' % (name, email)
elif email:
return email
raise NoWhoami()
def _auto_user_id():
"""Calculate automatic user identification.
:returns: (realname, email), either of which may be None if they can't be
determined.
Only used when none is set in the environment or the id file.
This only returns an email address if we can be fairly sure the
address is reasonable, ie if /etc/mailname is set on unix.
This doesn't use the FQDN as the default domain because that may be
slow, and it doesn't use the hostname alone because that's not normally
a reasonable address.
"""
if sys.platform == 'win32':
# No implementation to reliably determine Windows default mail
# address; please add one.
return None, None
default_mail_domain = _get_default_mail_domain()
if not default_mail_domain:
return None, None
import pwd
uid = os.getuid()
try:
w = pwd.getpwuid(uid)
except KeyError:
trace.mutter('no passwd entry for uid %d?' % uid)
return None, None
# we try utf-8 first, because on many variants (like Linux),
# /etc/passwd "should" be in utf-8, and because it's unlikely to give
# false positives. (many users will have their user encoding set to
# latin-1, which cannot raise UnicodeError.)
gecos = w.pw_gecos
if isinstance(gecos, bytes):
try:
gecos = gecos.decode('utf-8')
encoding = 'utf-8'
except UnicodeError:
try:
encoding = osutils.get_user_encoding()
gecos = gecos.decode(encoding)
except UnicodeError as e:
trace.mutter("cannot decode passwd entry %s" % w)
return None, None
username = w.pw_name
if isinstance(username, bytes):
try:
username = username.decode(encoding)
except UnicodeError as e:
trace.mutter("cannot decode passwd entry %s" % w)
return None, None
comma = gecos.find(',')
if comma == -1:
realname = gecos
else:
realname = gecos[:comma]
return realname, (username + '@' + default_mail_domain)
def parse_username(username):
"""Parse e-mail username and return a (name, address) tuple."""
match = re.match(r'(.*?)\s*<?([\w+.-]+@[\w+.-]+)>?', username)
if match is None:
return (username, '')
else:
return (match.group(1), match.group(2))
def extract_email_address(e):
"""Return just the address part of an email string.
That is just the user@domain part, nothing else.
This part is required to contain only ascii characters.
If it can't be extracted, raises an error.
>>> extract_email_address('Jane Tester <jane@test.com>')
"jane@test.com"
"""
name, email = parse_username(e)
if not email:
raise NoEmailInUsername(e)
return email
class TreeConfig(IniBasedConfig):
"""Branch configuration data associated with its contents, not location"""
# XXX: Really needs a better name, as this is not part of the tree! -- mbp 20080507
def __init__(self, branch):
self._config = branch._get_config()
self.branch = branch
def _get_parser(self, file=None):
if file is not None:
return IniBasedConfig._get_parser(file)
return self._config._get_configobj()
def get_option(self, name, section=None, default=None):
with self.branch.lock_read():
return self._config.get_option(name, section, default)
def set_option(self, value, name, section=None):
"""Set a per-branch configuration option"""
# FIXME: We shouldn't need to lock explicitly here but rather rely on
# higher levels providing the right lock -- vila 20101004
with self.branch.lock_write():
self._config.set_option(value, name, section)
def remove_option(self, option_name, section_name=None):
# FIXME: We shouldn't need to lock explicitly here but rather rely on
# higher levels providing the right lock -- vila 20101004
with self.branch.lock_write():
self._config.remove_option(option_name, section_name)
_authentication_config_permission_errors = set()
class AuthenticationConfig(object):
"""The authentication configuration file based on a ini file.
Implements the authentication.conf file described in
doc/developers/authentication-ring.txt.
"""
def __init__(self, _file=None):
self._config = None # The ConfigObj
if _file is None:
self._filename = authentication_config_filename()
self._input = self._filename = authentication_config_filename()
self._check_permissions()
else:
# Tests can provide a string as _file
self._filename = None
self._input = _file
def _get_config(self):
if self._config is not None:
return self._config
try:
# FIXME: Should we validate something here ? Includes: empty
# sections are useless, at least one of
# user/password/password_encoding should be defined, etc.
# Note: the encoding below declares that the file itself is utf-8
# encoded, but the values in the ConfigObj are always Unicode.
self._config = ConfigObj(self._input, encoding='utf-8')
except configobj.ConfigObjError as e:
raise ParseConfigError(e.errors, e.config.filename)
except UnicodeError:
raise ConfigContentError(self._filename)
return self._config
def _check_permissions(self):
"""Check permission of auth file are user read/write able only."""
try:
st = os.stat(self._filename)
except OSError as e:
if e.errno != errno.ENOENT:
trace.mutter('Unable to stat %r: %r', self._filename, e)
return
mode = stat.S_IMODE(st.st_mode)
if ((stat.S_IXOTH | stat.S_IWOTH | stat.S_IROTH | stat.S_IXGRP |
stat.S_IWGRP | stat.S_IRGRP ) & mode):
# Only warn once
if (not self._filename in _authentication_config_permission_errors
and not GlobalConfig().suppress_warning(
'insecure_permissions')):
trace.warning("The file '%s' has insecure "
"file permissions. Saved passwords may be accessible "
"by other users.", self._filename)
_authentication_config_permission_errors.add(self._filename)
def _save(self):
"""Save the config file, only tests should use it for now."""
conf_dir = os.path.dirname(self._filename)
ensure_config_dir_exists(conf_dir)
fd = os.open(self._filename, os.O_RDWR|os.O_CREAT, 0o600)
try:
f = os.fdopen(fd, 'wb')
self._get_config().write(f)
finally:
f.close()
def _set_option(self, section_name, option_name, value):
"""Set an authentication configuration option"""
conf = self._get_config()
section = conf.get(section_name)
if section is None:
conf[section] = {}
section = conf[section]
section[option_name] = value
self._save()
def get_credentials(self, scheme, host, port=None, user=None, path=None,
realm=None):
"""Returns the matching credentials from authentication.conf file.
:param scheme: protocol
:param host: the server address
:param port: the associated port (optional)
:param user: login (optional)
:param path: the absolute path on the server (optional)
:param realm: the http authentication realm (optional)
:return: A dict containing the matching credentials or None.
This includes:
- name: the section name of the credentials in the
authentication.conf file,
- user: can't be different from the provided user if any,
- scheme: the server protocol,
- host: the server address,
- port: the server port (can be None),
- path: the absolute server path (can be None),
- realm: the http specific authentication realm (can be None),
- password: the decoded password, could be None if the credential
defines only the user
- verify_certificates: https specific, True if the server
certificate should be verified, False otherwise.
"""
credentials = None
for auth_def_name, auth_def in self._get_config().iteritems():
if not isinstance(auth_def, configobj.Section):
raise ValueError("%s defined outside a section" % auth_def_name)
a_scheme, a_host, a_user, a_path = map(
auth_def.get, ['scheme', 'host', 'user', 'path'])
try:
a_port = auth_def.as_int('port')
except KeyError:
a_port = None
except ValueError:
raise ValueError("'port' not numeric in %s" % auth_def_name)
try:
a_verify_certificates = auth_def.as_bool('verify_certificates')
except KeyError:
a_verify_certificates = True
except ValueError:
raise ValueError(
"'verify_certificates' not boolean in %s" % auth_def_name)
# Attempt matching
if a_scheme is not None and scheme != a_scheme:
continue
if a_host is not None:
if not (host == a_host
or (a_host.startswith('.') and host.endswith(a_host))):
continue
if a_port is not None and port != a_port:
continue
if (a_path is not None and path is not None
and not path.startswith(a_path)):
continue
if (a_user is not None and user is not None
and a_user != user):
# Never contradict the caller about the user to be used
continue
if a_user is None:
# Can't find a user
continue
# Prepare a credentials dictionary with additional keys
# for the credential providers
credentials = dict(name=auth_def_name,
user=a_user,
scheme=a_scheme,
host=host,
port=port,
path=path,
realm=realm,
password=auth_def.get('password', None),
verify_certificates=a_verify_certificates)
# Decode the password in the credentials (or get one)
self.decode_password(credentials,
auth_def.get('password_encoding', None))
if 'auth' in debug.debug_flags:
trace.mutter("Using authentication section: %r", auth_def_name)
break
if credentials is None:
# No credentials were found in authentication.conf, try the fallback
# credentials stores.
credentials = credential_store_registry.get_fallback_credentials(
scheme, host, port, user, path, realm)
return credentials
def set_credentials(self, name, host, user, scheme=None, password=None,
port=None, path=None, verify_certificates=None,
realm=None):
"""Set authentication credentials for a host.
Any existing credentials with matching scheme, host, port and path
will be deleted, regardless of name.
:param name: An arbitrary name to describe this set of credentials.
:param host: Name of the host that accepts these credentials.
:param user: The username portion of these credentials.
:param scheme: The URL scheme (e.g. ssh, http) the credentials apply
to.
:param password: Password portion of these credentials.
:param port: The IP port on the host that these credentials apply to.
:param path: A filesystem path on the host that these credentials
apply to.
:param verify_certificates: On https, verify server certificates if
True.
:param realm: The http authentication realm (optional).
"""
values = {'host': host, 'user': user}
if password is not None:
values['password'] = password
if scheme is not None:
values['scheme'] = scheme
if port is not None:
values['port'] = '%d' % port
if path is not None:
values['path'] = path
if verify_certificates is not None:
values['verify_certificates'] = str(verify_certificates)
if realm is not None:
values['realm'] = realm
config = self._get_config()
for_deletion = []
for section, existing_values in config.iteritems():
for key in ('scheme', 'host', 'port', 'path', 'realm'):
if existing_values.get(key) != values.get(key):
break
else:
del config[section]
config.update({name: values})
self._save()
def get_user(self, scheme, host, port=None, realm=None, path=None,
prompt=None, ask=False, default=None):
"""Get a user from authentication file.
:param scheme: protocol
:param host: the server address
:param port: the associated port (optional)
:param realm: the realm sent by the server (optional)
:param path: the absolute path on the server (optional)
:param ask: Ask the user if there is no explicitly configured username
(optional)
:param default: The username returned if none is defined (optional).
:return: The found user.
"""
credentials = self.get_credentials(scheme, host, port, user=None,
path=path, realm=realm)
if credentials is not None:
user = credentials['user']
else:
user = None
if user is None:
if ask:
if prompt is None:
# Create a default prompt suitable for most cases
prompt = u'%s' % (scheme.upper(),) + u' %(host)s username'
# Special handling for optional fields in the prompt
if port is not None:
prompt_host = '%s:%d' % (host, port)
else:
prompt_host = host
user = ui.ui_factory.get_username(prompt, host=prompt_host)
else:
user = default
return user
def get_password(self, scheme, host, user, port=None,
realm=None, path=None, prompt=None):
"""Get a password from authentication file or prompt the user for one.
:param scheme: protocol
:param host: the server address
:param port: the associated port (optional)
:param user: login
:param realm: the realm sent by the server (optional)
:param path: the absolute path on the server (optional)
:return: The found password or the one entered by the user.
"""
credentials = self.get_credentials(scheme, host, port, user, path,
realm)
if credentials is not None:
password = credentials['password']
if password is not None and scheme is 'ssh':
trace.warning('password ignored in section [%s],'
' use an ssh agent instead'
% credentials['name'])
password = None
else:
password = None
# Prompt user only if we could't find a password
if password is None:
if prompt is None:
# Create a default prompt suitable for most cases
prompt = u'%s' % scheme.upper() + u' %(user)s@%(host)s password'
# Special handling for optional fields in the prompt
if port is not None:
prompt_host = '%s:%d' % (host, port)
else:
prompt_host = host
password = ui.ui_factory.get_password(prompt,
host=prompt_host, user=user)
return password
def decode_password(self, credentials, encoding):
try:
cs = credential_store_registry.get_credential_store(encoding)
except KeyError:
raise ValueError('%r is not a known password_encoding' % encoding)
credentials['password'] = cs.decode_password(credentials)
return credentials
class CredentialStoreRegistry(registry.Registry):
"""A class that registers credential stores.
A credential store provides access to credentials via the password_encoding
field in authentication.conf sections.
Except for stores provided by brz itself, most stores are expected to be
provided by plugins that will therefore use
register_lazy(password_encoding, module_name, member_name, help=help,
fallback=fallback) to install themselves.
A fallback credential store is one that is queried if no credentials can be
found via authentication.conf.
"""
def get_credential_store(self, encoding=None):
cs = self.get(encoding)
if callable(cs):
cs = cs()
return cs
def is_fallback(self, name):
"""Check if the named credentials store should be used as fallback."""
return self.get_info(name)
def get_fallback_credentials(self, scheme, host, port=None, user=None,
path=None, realm=None):
"""Request credentials from all fallback credentials stores.
The first credentials store that can provide credentials wins.
"""
credentials = None
for name in self.keys():
if not self.is_fallback(name):
continue
cs = self.get_credential_store(name)
credentials = cs.get_credentials(scheme, host, port, user,
path, realm)
if credentials is not None:
# We found some credentials
break
return credentials
def register(self, key, obj, help=None, override_existing=False,
fallback=False):
"""Register a new object to a name.
:param key: This is the key to use to request the object later.
:param obj: The object to register.
:param help: Help text for this entry. This may be a string or
a callable. If it is a callable, it should take two
parameters (registry, key): this registry and the key that
the help was registered under.
:param override_existing: Raise KeyErorr if False and something has
already been registered for that key. If True, ignore if there
is an existing key (always register the new value).
:param fallback: Whether this credential store should be
used as fallback.
"""
return super(CredentialStoreRegistry,
self).register(key, obj, help, info=fallback,
override_existing=override_existing)
def register_lazy(self, key, module_name, member_name,
help=None, override_existing=False,
fallback=False):
"""Register a new credential store to be loaded on request.
:param module_name: The python path to the module. Such as 'os.path'.
:param member_name: The member of the module to return. If empty or
None, get() will return the module itself.
:param help: Help text for this entry. This may be a string or
a callable.
:param override_existing: If True, replace the existing object
with the new one. If False, if there is already something
registered with the same key, raise a KeyError
:param fallback: Whether this credential store should be
used as fallback.
"""
return super(CredentialStoreRegistry, self).register_lazy(
key, module_name, member_name, help,
info=fallback, override_existing=override_existing)
credential_store_registry = CredentialStoreRegistry()
class CredentialStore(object):
"""An abstract class to implement storage for credentials"""
def decode_password(self, credentials):
"""Returns a clear text password for the provided credentials."""
raise NotImplementedError(self.decode_password)
def get_credentials(self, scheme, host, port=None, user=None, path=None,
realm=None):
"""Return the matching credentials from this credential store.
This method is only called on fallback credential stores.
"""
raise NotImplementedError(self.get_credentials)
class PlainTextCredentialStore(CredentialStore):
__doc__ = """Plain text credential store for the authentication.conf file"""
def decode_password(self, credentials):
"""See CredentialStore.decode_password."""
return credentials['password']
credential_store_registry.register('plain', PlainTextCredentialStore,
help=PlainTextCredentialStore.__doc__)
credential_store_registry.default_key = 'plain'
class Base64CredentialStore(CredentialStore):
__doc__ = """Base64 credential store for the authentication.conf file"""
def decode_password(self, credentials):
"""See CredentialStore.decode_password."""
# GZ 2012-07-28: Will raise binascii.Error if password is not base64,
# should probably propogate as something more useful.
return base64.decodestring(credentials['password'])
credential_store_registry.register('base64', Base64CredentialStore,
help=Base64CredentialStore.__doc__)
class BzrDirConfig(object):
def __init__(self, bzrdir):
self._bzrdir = bzrdir
self._config = bzrdir._get_config()
def set_default_stack_on(self, value):
"""Set the default stacking location.
It may be set to a location, or None.
This policy affects all branches contained by this control dir, except
for those under repositories.
"""
if self._config is None:
raise errors.BzrError("Cannot set configuration in %s"
% self._bzrdir)
if value is None:
self._config.set_option('', 'default_stack_on')
else:
self._config.set_option(value, 'default_stack_on')
def get_default_stack_on(self):
"""Return the default stacking location.
This will either be a location, or None.
This policy affects all branches contained by this control dir, except
for those under repositories.
"""
if self._config is None:
return None
value = self._config.get_option('default_stack_on')
if value == '':
value = None
return value
class TransportConfig(object):
"""A Config that reads/writes a config file on a Transport.
It is a low-level object that considers config data to be name/value pairs
that may be associated with a section. Assigning meaning to these values
is done at higher levels like TreeConfig.
"""
def __init__(self, transport, filename):
self._transport = transport
self._filename = filename
def get_option(self, name, section=None, default=None):
"""Return the value associated with a named option.
:param name: The name of the value
:param section: The section the option is in (if any)
:param default: The value to return if the value is not set
:return: The value or default value
"""
configobj = self._get_configobj()
if section is None:
section_obj = configobj
else:
try:
section_obj = configobj[section]
except KeyError:
return default
value = section_obj.get(name, default)
for hook in OldConfigHooks['get']:
hook(self, name, value)
return value
def set_option(self, value, name, section=None):
"""Set the value associated with a named option.
:param value: The value to set
:param name: The name of the value to set
:param section: The section the option is in (if any)
"""
configobj = self._get_configobj()
if section is None:
configobj[name] = value
else:
configobj.setdefault(section, {})[name] = value
for hook in OldConfigHooks['set']:
hook(self, name, value)
self._set_configobj(configobj)
def remove_option(self, option_name, section_name=None):
configobj = self._get_configobj()
if section_name is None:
del configobj[option_name]
else:
del configobj[section_name][option_name]
for hook in OldConfigHooks['remove']:
hook(self, option_name)
self._set_configobj(configobj)
def _get_config_file(self):
try:
f = BytesIO(self._transport.get_bytes(self._filename))
for hook in OldConfigHooks['load']:
hook(self)
return f
except errors.NoSuchFile:
return BytesIO()
except errors.PermissionDenied as e:
trace.warning("Permission denied while trying to open "
"configuration file %s.", urlutils.unescape_for_display(
urlutils.join(self._transport.base, self._filename), "utf-8"))
return BytesIO()
def _external_url(self):
return urlutils.join(self._transport.external_url(), self._filename)
def _get_configobj(self):
f = self._get_config_file()
try:
try:
conf = ConfigObj(f, encoding='utf-8')
except configobj.ConfigObjError as e:
raise ParseConfigError(e.errors, self._external_url())
except UnicodeDecodeError:
raise ConfigContentError(self._external_url())
finally:
f.close()
return conf
def _set_configobj(self, configobj):
out_file = BytesIO()
configobj.write(out_file)
out_file.seek(0)
self._transport.put_file(self._filename, out_file)
for hook in OldConfigHooks['save']:
hook(self)
class Option(object):
"""An option definition.
The option *values* are stored in config files and found in sections.
Here we define various properties about the option itself, its default
value, how to convert it from stores, what to do when invalid values are
encoutered, in which config files it can be stored.
"""
def __init__(self, name, override_from_env=None,
default=None, default_from_env=None,
help=None, from_unicode=None, invalid=None, unquote=True):
"""Build an option definition.
:param name: the name used to refer to the option.
:param override_from_env: A list of environment variables which can
provide override any configuration setting.
:param default: the default value to use when none exist in the config
stores. This is either a string that ``from_unicode`` will convert
into the proper type, a callable returning a unicode string so that
``from_unicode`` can be used on the return value, or a python
object that can be stringified (so only the empty list is supported
for example).
:param default_from_env: A list of environment variables which can
provide a default value. 'default' will be used only if none of the
variables specified here are set in the environment.
:param help: a doc string to explain the option to the user.
:param from_unicode: a callable to convert the unicode string
representing the option value in a store or its default value.
:param invalid: the action to be taken when an invalid value is
encountered in a store. This is called only when from_unicode is
invoked to convert a string and returns None or raise ValueError or
TypeError. Accepted values are: None (ignore invalid values),
'warning' (emit a warning), 'error' (emit an error message and
terminates).
:param unquote: should the unicode value be unquoted before conversion.
This should be used only when the store providing the values cannot
safely unquote them (see http://pad.lv/906897). It is provided so
daughter classes can handle the quoting themselves.
"""
if override_from_env is None:
override_from_env = []
if default_from_env is None:
default_from_env = []
self.name = name
self.override_from_env = override_from_env
# Convert the default value to a unicode string so all values are
# strings internally before conversion (via from_unicode) is attempted.
if default is None:
self.default = None
elif isinstance(default, list):
# Only the empty list is supported
if default:
raise AssertionError(
'Only empty lists are supported as default values')
self.default = u','
elif isinstance(default, (binary_type, text_type, bool, int, float)):
# Rely on python to convert strings, booleans and integers
self.default = u'%s' % (default,)
elif callable(default):
self.default = default
else:
# other python objects are not expected
raise AssertionError('%r is not supported as a default value'
% (default,))
self.default_from_env = default_from_env
self._help = help
self.from_unicode = from_unicode
self.unquote = unquote
if invalid and invalid not in ('warning', 'error'):
raise AssertionError("%s not supported for 'invalid'" % (invalid,))
self.invalid = invalid
@property
def help(self):
return self._help
def convert_from_unicode(self, store, unicode_value):
if self.unquote and store is not None and unicode_value is not None:
unicode_value = store.unquote(unicode_value)
if self.from_unicode is None or unicode_value is None:
# Don't convert or nothing to convert
return unicode_value
try:
converted = self.from_unicode(unicode_value)
except (ValueError, TypeError):
# Invalid values are ignored
converted = None
if converted is None and self.invalid is not None:
# The conversion failed
if self.invalid == 'warning':
trace.warning('Value "%s" is not valid for "%s"',
unicode_value, self.name)
elif self.invalid == 'error':
raise ConfigOptionValueError(self.name, unicode_value)
return converted
def get_override(self):
value = None
for var in self.override_from_env:
try:
# If the env variable is defined, its value takes precedence
value = os.environ[var].decode(osutils.get_user_encoding())
break
except KeyError:
continue
return value
def get_default(self):
value = None
for var in self.default_from_env:
try:
# If the env variable is defined, its value is the default one
value = os.environ[var]
if not PY3:
value = value.decode(osutils.get_user_encoding())
break
except KeyError:
continue
if value is None:
# Otherwise, fallback to the value defined at registration
if callable(self.default):
value = self.default()
if not isinstance(value, text_type):
raise AssertionError(
"Callable default value for '%s' should be unicode"
% (self.name))
else:
value = self.default
return value
def get_help_topic(self):
return self.name
def get_help_text(self, additional_see_also=None, plain=True):
result = self.help
from breezy import help_topics
result += help_topics._format_see_also(additional_see_also)
if plain:
result = help_topics.help_as_plain_text(result)
return result
# Predefined converters to get proper values from store
def bool_from_store(unicode_str):
return ui.bool_from_string(unicode_str)
def int_from_store(unicode_str):
return int(unicode_str)
_unit_suffixes = dict(K=10**3, M=10**6, G=10**9)
def int_SI_from_store(unicode_str):
"""Convert a human readable size in SI units, e.g 10MB into an integer.
Accepted suffixes are K,M,G. It is case-insensitive and may be followed
by a trailing b (i.e. Kb, MB). This is intended to be practical and not
pedantic.
:return Integer, expanded to its base-10 value if a proper SI unit is
found, None otherwise.
"""
regexp = "^(\\d+)(([" + ''.join(_unit_suffixes) + "])b?)?$"
p = re.compile(regexp, re.IGNORECASE)
m = p.match(unicode_str)
val = None
if m is not None:
val, _, unit = m.groups()
val = int(val)
if unit:
try:
coeff = _unit_suffixes[unit.upper()]
except KeyError:
raise ValueError(gettext('{0} is not an SI unit.').format(unit))
val *= coeff
return val
def float_from_store(unicode_str):
return float(unicode_str)
# Use an empty dict to initialize an empty configobj avoiding all parsing and
# encoding checks
_list_converter_config = configobj.ConfigObj(
{}, encoding='utf-8', list_values=True, interpolation=False)
class ListOption(Option):
def __init__(self, name, default=None, default_from_env=None,
help=None, invalid=None):
"""A list Option definition.
This overrides the base class so the conversion from a unicode string
can take quoting into account.
"""
super(ListOption, self).__init__(
name, default=default, default_from_env=default_from_env,
from_unicode=self.from_unicode, help=help,
invalid=invalid, unquote=False)
def from_unicode(self, unicode_str):
if not isinstance(unicode_str, string_types):
raise TypeError
# Now inject our string directly as unicode. All callers got their
# value from configobj, so values that need to be quoted are already
# properly quoted.
_list_converter_config.reset()
_list_converter_config._parse([u"list=%s" % (unicode_str,)])
maybe_list = _list_converter_config['list']
if isinstance(maybe_list, string_types):
if maybe_list:
# A single value, most probably the user forgot (or didn't care
# to add) the final ','
l = [maybe_list]
else:
# The empty string, convert to empty list
l = []
else:
# We rely on ConfigObj providing us with a list already
l = maybe_list
return l
class RegistryOption(Option):
"""Option for a choice from a registry."""
def __init__(self, name, registry, default_from_env=None,
help=None, invalid=None):
"""A registry based Option definition.
This overrides the base class so the conversion from a unicode string
can take quoting into account.
"""
super(RegistryOption, self).__init__(
name, default=lambda: unicode(registry.default_key),
default_from_env=default_from_env,
from_unicode=self.from_unicode, help=help,
invalid=invalid, unquote=False)
self.registry = registry
def from_unicode(self, unicode_str):
if not isinstance(unicode_str, string_types):
raise TypeError
try:
return self.registry.get(unicode_str)
except KeyError:
raise ValueError(
"Invalid value %s for %s."
"See help for a list of possible values." % (unicode_str,
self.name))
@property
def help(self):
ret = [self._help, "\n\nThe following values are supported:\n"]
for key in self.registry.keys():
ret.append(" %s - %s\n" % (key, self.registry.get_help(key)))
return "".join(ret)
_option_ref_re = lazy_regex.lazy_compile('({[^\\d\\W](?:\\.\\w|-\\w|\\w)*})')
"""Describes an expandable option reference.
We want to match the most embedded reference first.
I.e. for '{{foo}}' we will get '{foo}',
for '{bar{baz}}' we will get '{baz}'
"""
def iter_option_refs(string):
# Split isolate refs so every other chunk is a ref
is_ref = False
for chunk in _option_ref_re.split(string):
yield is_ref, chunk
is_ref = not is_ref
class OptionRegistry(registry.Registry):
"""Register config options by their name.
This overrides ``registry.Registry`` to simplify registration by acquiring
some information from the option object itself.
"""
def _check_option_name(self, option_name):
"""Ensures an option name is valid.
:param option_name: The name to validate.
"""
if _option_ref_re.match('{%s}' % option_name) is None:
raise IllegalOptionName(option_name)
def register(self, option):
"""Register a new option to its name.
:param option: The option to register. Its name is used as the key.
"""
self._check_option_name(option.name)
super(OptionRegistry, self).register(option.name, option,
help=option.help)
def register_lazy(self, key, module_name, member_name):
"""Register a new option to be loaded on request.
:param key: the key to request the option later. Since the registration
is lazy, it should be provided and match the option name.
:param module_name: the python path to the module. Such as 'os.path'.
:param member_name: the member of the module to return. If empty or
None, get() will return the module itself.
"""
self._check_option_name(key)
super(OptionRegistry, self).register_lazy(key,
module_name, member_name)
def get_help(self, key=None):
"""Get the help text associated with the given key"""
option = self.get(key)
the_help = option.help
if callable(the_help):
return the_help(self, key)
return the_help
option_registry = OptionRegistry()
# Registered options in lexicographical order
option_registry.register(
Option('append_revisions_only',
default=None, from_unicode=bool_from_store, invalid='warning',
help='''\
Whether to only append revisions to the mainline.
If this is set to true, then it is not possible to change the
existing mainline of the branch.
'''))
option_registry.register(
ListOption('acceptable_keys',
default=None,
help="""\
List of GPG key patterns which are acceptable for verification.
"""))
option_registry.register(
Option('add.maximum_file_size',
default=u'20MB', from_unicode=int_SI_from_store,
help="""\
Size above which files should be added manually.
Files below this size are added automatically when using ``bzr add`` without
arguments.
A negative value means disable the size check.
"""))
option_registry.register(
Option('bound',
default=None, from_unicode=bool_from_store,
help="""\
Is the branch bound to ``bound_location``.
If set to "True", the branch should act as a checkout, and push each commit to
the bound_location. This option is normally set by ``bind``/``unbind``.
See also: bound_location.
"""))
option_registry.register(
Option('bound_location',
default=None,
help="""\
The location that commits should go to when acting as a checkout.
This option is normally set by ``bind``.
See also: bound.
"""))
option_registry.register(
Option('branch.fetch_tags', default=False, from_unicode=bool_from_store,
help="""\
Whether revisions associated with tags should be fetched.
"""))
option_registry.register_lazy(
'bzr.transform.orphan_policy', 'breezy.transform', 'opt_transform_orphan')
option_registry.register(
Option('bzr.workingtree.worth_saving_limit', default=10,
from_unicode=int_from_store, invalid='warning',
help='''\
How many changes before saving the dirstate.
-1 means that we will never rewrite the dirstate file for only
stat-cache changes. Regardless of this setting, we will always rewrite
the dirstate file if a file is added/removed/renamed/etc. This flag only
affects the behavior of updating the dirstate file after we notice that
a file has been touched.
'''))
option_registry.register(
Option('bugtracker', default=None,
help='''\
Default bug tracker to use.
This bug tracker will be used for example when marking bugs
as fixed using ``bzr commit --fixes``, if no explicit
bug tracker was specified.
'''))
option_registry.register(
Option('check_signatures', default=CHECK_IF_POSSIBLE,
from_unicode=signature_policy_from_unicode,
help='''\
GPG checking policy.
Possible values: require, ignore, check-available (default)
this option will control whether bzr will require good gpg
signatures, ignore them, or check them if they are
present.
'''))
option_registry.register(
Option('child_submit_format',
help='''The preferred format of submissions to this branch.'''))
option_registry.register(
Option('child_submit_to',
help='''Where submissions to this branch are mailed to.'''))
option_registry.register(
Option('create_signatures', default=SIGN_WHEN_REQUIRED,
from_unicode=signing_policy_from_unicode,
help='''\
GPG Signing policy.
Possible values: always, never, when-required (default)
This option controls whether bzr will always create
gpg signatures or not on commits.
'''))
option_registry.register(
Option('dirstate.fdatasync', default=True,
from_unicode=bool_from_store,
help='''\
Flush dirstate changes onto physical disk?
If true (default), working tree metadata changes are flushed through the
OS buffers to physical disk. This is somewhat slower, but means data
should not be lost if the machine crashes. See also repository.fdatasync.
'''))
option_registry.register(
ListOption('debug_flags', default=[],
help='Debug flags to activate.'))
option_registry.register(
Option('default_format', default='2a',
help='Format used when creating branches.'))
option_registry.register(
Option('dpush_strict', default=None,
from_unicode=bool_from_store,
help='''\
The default value for ``dpush --strict``.
If present, defines the ``--strict`` option default value for checking
uncommitted changes before pushing into a different VCS without any
custom bzr metadata.
'''))
option_registry.register(
Option('editor',
help='The command called to launch an editor to enter a message.'))
option_registry.register(
Option('email', override_from_env=['BRZ_EMAIL'], default=default_email,
help='The users identity'))
option_registry.register(
Option('gpg_signing_key',
default=None,
help="""\
GPG key to use for signing.
This defaults to the first key associated with the users email.
"""))
option_registry.register(
Option('language',
help='Language to translate messages into.'))
option_registry.register(
Option('locks.steal_dead', default=False, from_unicode=bool_from_store,
help='''\
Steal locks that appears to be dead.
If set to True, bzr will check if a lock is supposed to be held by an
active process from the same user on the same machine. If the user and
machine match, but no process with the given PID is active, then bzr
will automatically break the stale lock, and create a new lock for
this process.
Otherwise, bzr will prompt as normal to break the lock.
'''))
option_registry.register(
Option('log_format', default='long',
help= '''\
Log format to use when displaying revisions.
Standard log formats are ``long``, ``short`` and ``line``. Additional formats
may be provided by plugins.
'''))
option_registry.register_lazy('mail_client', 'breezy.mail_client',
'opt_mail_client')
option_registry.register(
Option('output_encoding',
help= 'Unicode encoding for output'
' (terminal encoding if not specified).'))
option_registry.register(
Option('parent_location',
default=None,
help="""\
The location of the default branch for pull or merge.
This option is normally set when creating a branch, the first ``pull`` or by
``pull --remember``.
"""))
option_registry.register(
Option('post_commit', default=None,
help='''\
Post commit functions.
An ordered list of python functions to call, separated by spaces.
Each function takes branch, rev_id as parameters.
'''))
option_registry.register_lazy('progress_bar', 'breezy.ui.text',
'opt_progress_bar')
option_registry.register(
Option('public_branch',
default=None,
help="""\
A publically-accessible version of this branch.
This implies that the branch setting this option is not publically-accessible.
Used and set by ``bzr send``.
"""))
option_registry.register(
Option('push_location',
default=None,
help="""\
The location of the default branch for push.
This option is normally set by the first ``push`` or ``push --remember``.
"""))
option_registry.register(
Option('push_strict', default=None,
from_unicode=bool_from_store,
help='''\
The default value for ``push --strict``.
If present, defines the ``--strict`` option default value for checking
uncommitted changes before sending a merge directive.
'''))
option_registry.register(
Option('repository.fdatasync', default=True,
from_unicode=bool_from_store,
help='''\
Flush repository changes onto physical disk?
If true (default), repository changes are flushed through the OS buffers
to physical disk. This is somewhat slower, but means data should not be
lost if the machine crashes. See also dirstate.fdatasync.
'''))
option_registry.register_lazy('smtp_server',
'breezy.smtp_connection', 'smtp_server')
option_registry.register_lazy('smtp_password',
'breezy.smtp_connection', 'smtp_password')
option_registry.register_lazy('smtp_username',
'breezy.smtp_connection', 'smtp_username')
option_registry.register(
Option('selftest.timeout',
default='600',
from_unicode=int_from_store,
help='Abort selftest if one test takes longer than this many seconds',
))
option_registry.register(
Option('send_strict', default=None,
from_unicode=bool_from_store,
help='''\
The default value for ``send --strict``.
If present, defines the ``--strict`` option default value for checking
uncommitted changes before sending a bundle.
'''))
option_registry.register(
Option('serve.client_timeout',
default=300.0, from_unicode=float_from_store,
help="If we wait for a new request from a client for more than"
" X seconds, consider the client idle, and hangup."))
option_registry.register(
Option('stacked_on_location',
default=None,
help="""The location where this branch is stacked on."""))
option_registry.register(
Option('submit_branch',
default=None,
help="""\
The branch you intend to submit your current work to.
This is automatically set by ``bzr send`` and ``bzr merge``, and is also used
by the ``submit:`` revision spec.
"""))
option_registry.register(
Option('submit_to',
help='''Where submissions from this branch are mailed to.'''))
option_registry.register(
ListOption('suppress_warnings',
default=[],
help="List of warning classes to suppress."))
option_registry.register(
Option('validate_signatures_in_log', default=False,
from_unicode=bool_from_store, invalid='warning',
help='''Whether to validate signatures in brz log.'''))
option_registry.register_lazy('ssl.ca_certs',
'breezy.transport.http._urllib2_wrappers', 'opt_ssl_ca_certs')
option_registry.register_lazy('ssl.cert_reqs',
'breezy.transport.http._urllib2_wrappers', 'opt_ssl_cert_reqs')
class Section(object):
"""A section defines a dict of option name => value.
This is merely a read-only dict which can add some knowledge about the
options. It is *not* a python dict object though and doesn't try to mimic
its API.
"""
def __init__(self, section_id, options):
self.id = section_id
# We re-use the dict-like object received
self.options = options
def get(self, name, default=None, expand=True):
return self.options.get(name, default)
def iter_option_names(self):
for k in self.options.keys():
yield k
def __repr__(self):
# Mostly for debugging use
return "<config.%s id=%s>" % (self.__class__.__name__, self.id)
_NewlyCreatedOption = object()
"""Was the option created during the MutableSection lifetime"""
_DeletedOption = object()
"""Was the option deleted during the MutableSection lifetime"""
class MutableSection(Section):
"""A section allowing changes and keeping track of the original values."""
def __init__(self, section_id, options):
super(MutableSection, self).__init__(section_id, options)
self.reset_changes()
def set(self, name, value):
if name not in self.options:
# This is a new option
self.orig[name] = _NewlyCreatedOption
elif name not in self.orig:
self.orig[name] = self.get(name, None)
self.options[name] = value
def remove(self, name):
if name not in self.orig and name in self.options:
self.orig[name] = self.get(name, None)
del self.options[name]
def reset_changes(self):
self.orig = {}
def apply_changes(self, dirty, store):
"""Apply option value changes.
``self`` has been reloaded from the persistent storage. ``dirty``
contains the changes made since the previous loading.
:param dirty: the mutable section containing the changes.
:param store: the store containing the section
"""
for k, expected in dirty.orig.items():
actual = dirty.get(k, _DeletedOption)
reloaded = self.get(k, _NewlyCreatedOption)
if actual is _DeletedOption:
if k in self.options:
self.remove(k)
else:
self.set(k, actual)
# Report concurrent updates in an ad-hoc way. This should only
# occurs when different processes try to update the same option
# which is not supported (as in: the config framework is not meant
# to be used as a sharing mechanism).
if expected != reloaded:
if actual is _DeletedOption:
actual = '<DELETED>'
if reloaded is _NewlyCreatedOption:
reloaded = '<CREATED>'
if expected is _NewlyCreatedOption:
expected = '<CREATED>'
# Someone changed the value since we get it from the persistent
# storage.
trace.warning(gettext(
"Option {0} in section {1} of {2} was changed"
" from {3} to {4}. The {5} value will be saved.".format(
k, self.id, store.external_url(), expected,
reloaded, actual)))
# No need to keep track of these changes
self.reset_changes()
class Store(object):
"""Abstract interface to persistent storage for configuration options."""
readonly_section_class = Section
mutable_section_class = MutableSection
def __init__(self):
# Which sections need to be saved (by section id). We use a dict here
# so the dirty sections can be shared by multiple callers.
self.dirty_sections = {}
def is_loaded(self):
"""Returns True if the Store has been loaded.
This is used to implement lazy loading and ensure the persistent
storage is queried only when needed.
"""
raise NotImplementedError(self.is_loaded)
def load(self):
"""Loads the Store from persistent storage."""
raise NotImplementedError(self.load)
def _load_from_string(self, bytes):
"""Create a store from a string in configobj syntax.
:param bytes: A string representing the file content.
"""
raise NotImplementedError(self._load_from_string)
def unload(self):
"""Unloads the Store.
This should make is_loaded() return False. This is used when the caller
knows that the persistent storage has changed or may have change since
the last load.
"""
raise NotImplementedError(self.unload)
def quote(self, value):
"""Quote a configuration option value for storing purposes.
This allows Stacks to present values as they will be stored.
"""
return value
def unquote(self, value):
"""Unquote a configuration option value into unicode.
The received value is quoted as stored.
"""
return value
def save(self):
"""Saves the Store to persistent storage."""
raise NotImplementedError(self.save)
def _need_saving(self):
for s in self.dirty_sections.values():
if s.orig:
# At least one dirty section contains a modification
return True
return False
def apply_changes(self, dirty_sections):
"""Apply changes from dirty sections while checking for coherency.
The Store content is discarded and reloaded from persistent storage to
acquire up-to-date values.
Dirty sections are MutableSection which kept track of the value they
are expected to update.
"""
# We need an up-to-date version from the persistent storage, unload the
# store. The reload will occur when needed (triggered by the first
# get_mutable_section() call below.
self.unload()
# Apply the changes from the preserved dirty sections
for section_id, dirty in dirty_sections.items():
clean = self.get_mutable_section(section_id)
clean.apply_changes(dirty, self)
# Everything is clean now
self.dirty_sections = {}
def save_changes(self):
"""Saves the Store to persistent storage if changes occurred.
Apply the changes recorded in the mutable sections to a store content
refreshed from persistent storage.
"""
raise NotImplementedError(self.save_changes)
def external_url(self):
raise NotImplementedError(self.external_url)
def get_sections(self):
"""Returns an ordered iterable of existing sections.
:returns: An iterable of (store, section).
"""
raise NotImplementedError(self.get_sections)
def get_mutable_section(self, section_id=None):
"""Returns the specified mutable section.
:param section_id: The section identifier
"""
raise NotImplementedError(self.get_mutable_section)
def __repr__(self):
# Mostly for debugging use
return "<config.%s(%s)>" % (self.__class__.__name__,
self.external_url())
class CommandLineStore(Store):
"A store to carry command line overrides for the config options."""
def __init__(self, opts=None):
super(CommandLineStore, self).__init__()
if opts is None:
opts = {}
self.options = {}
self.id = 'cmdline'
def _reset(self):
# The dict should be cleared but not replaced so it can be shared.
self.options.clear()
def _from_cmdline(self, overrides):
# Reset before accepting new definitions
self._reset()
for over in overrides:
try:
name, value = over.split('=', 1)
except ValueError:
raise errors.BzrCommandError(
gettext("Invalid '%s', should be of the form 'name=value'")
% (over,))
self.options[name] = value
def external_url(self):
# Not an url but it makes debugging easier and is never needed
# otherwise
return 'cmdline'
def get_sections(self):
yield self, self.readonly_section_class(None, self.options)
class IniFileStore(Store):
"""A config Store using ConfigObj for storage.
:ivar _config_obj: Private member to hold the ConfigObj instance used to
serialize/deserialize the config file.
"""
def __init__(self):
"""A config Store using ConfigObj for storage.
"""
super(IniFileStore, self).__init__()
self._config_obj = None
def is_loaded(self):
return self._config_obj != None
def unload(self):
self._config_obj = None
self.dirty_sections = {}
def _load_content(self):
"""Load the config file bytes.
This should be provided by subclasses
:return: Byte string
"""
raise NotImplementedError(self._load_content)
def _save_content(self, content):
"""Save the config file bytes.
This should be provided by subclasses
:param content: Config file bytes to write
"""
raise NotImplementedError(self._save_content)
def load(self):
"""Load the store from the associated file."""
if self.is_loaded():
return
content = self._load_content()
self._load_from_string(content)
for hook in ConfigHooks['load']:
hook(self)
def _load_from_string(self, bytes):
"""Create a config store from a string.
:param bytes: A string representing the file content.
"""
if self.is_loaded():
raise AssertionError('Already loaded: %r' % (self._config_obj,))
co_input = BytesIO(bytes)
try:
# The config files are always stored utf8-encoded
self._config_obj = ConfigObj(co_input, encoding='utf-8',
list_values=False)
except configobj.ConfigObjError as e:
self._config_obj = None
raise ParseConfigError(e.errors, self.external_url())
except UnicodeDecodeError:
raise ConfigContentError(self.external_url())
def save_changes(self):
if not self.is_loaded():
# Nothing to save
return
if not self._need_saving():
return
# Preserve the current version
dirty_sections = self.dirty_sections.copy()
self.apply_changes(dirty_sections)
# Save to the persistent storage
self.save()
def save(self):
if not self.is_loaded():
# Nothing to save
return
out = BytesIO()
self._config_obj.write(out)
self._save_content(out.getvalue())
for hook in ConfigHooks['save']:
hook(self)
def get_sections(self):
"""Get the configobj section in the file order.
:returns: An iterable of (store, section).
"""
# We need a loaded store
try:
self.load()
except (errors.NoSuchFile, errors.PermissionDenied):
# If the file can't be read, there is no sections
return
cobj = self._config_obj
if cobj.scalars:
yield self, self.readonly_section_class(None, cobj)
for section_name in cobj.sections:
yield (self,
self.readonly_section_class(section_name,
cobj[section_name]))
def get_mutable_section(self, section_id=None):
# We need a loaded store
try:
self.load()
except errors.NoSuchFile:
# The file doesn't exist, let's pretend it was empty
self._load_from_string(b'')
if section_id in self.dirty_sections:
# We already created a mutable section for this id
return self.dirty_sections[section_id]
if section_id is None:
section = self._config_obj
else:
section = self._config_obj.setdefault(section_id, {})
mutable_section = self.mutable_section_class(section_id, section)
# All mutable sections can become dirty
self.dirty_sections[section_id] = mutable_section
return mutable_section
def quote(self, value):
try:
# configobj conflates automagical list values and quoting
self._config_obj.list_values = True
return self._config_obj._quote(value)
finally:
self._config_obj.list_values = False
def unquote(self, value):
if value and isinstance(value, string_types):
# _unquote doesn't handle None nor empty strings nor anything that
# is not a string, really.
value = self._config_obj._unquote(value)
return value
def external_url(self):
# Since an IniFileStore can be used without a file (at least in tests),
# it's better to provide something than raising a NotImplementedError.
# All daughter classes are supposed to provide an implementation
# anyway.
return 'In-Process Store, no URL'
class TransportIniFileStore(IniFileStore):
"""IniFileStore that loads files from a transport.
:ivar transport: The transport object where the config file is located.
:ivar file_name: The config file basename in the transport directory.
"""
def __init__(self, transport, file_name):
"""A Store using a ini file on a Transport
:param transport: The transport object where the config file is located.
:param file_name: The config file basename in the transport directory.
"""
super(TransportIniFileStore, self).__init__()
self.transport = transport
self.file_name = file_name
def _load_content(self):
try:
return self.transport.get_bytes(self.file_name)
except errors.PermissionDenied:
trace.warning("Permission denied while trying to load "
"configuration store %s.", self.external_url())
raise
def _save_content(self, content):
self.transport.put_bytes(self.file_name, content)
def external_url(self):
# FIXME: external_url should really accepts an optional relpath
# parameter (bug #750169) :-/ -- vila 2011-04-04
# The following will do in the interim but maybe we don't want to
# expose a path here but rather a config ID and its associated
# object </hand wawe>.
return urlutils.join(
self.transport.external_url(), urlutils.escape(self.file_name))
# Note that LockableConfigObjStore inherits from ConfigObjStore because we need
# unlockable stores for use with objects that can already ensure the locking
# (think branches). If different stores (not based on ConfigObj) are created,
# they may face the same issue.
class LockableIniFileStore(TransportIniFileStore):
"""A ConfigObjStore using locks on save to ensure store integrity."""
def __init__(self, transport, file_name, lock_dir_name=None):
"""A config Store using ConfigObj for storage.
:param transport: The transport object where the config file is located.
:param file_name: The config file basename in the transport directory.
"""
if lock_dir_name is None:
lock_dir_name = 'lock'
self.lock_dir_name = lock_dir_name
super(LockableIniFileStore, self).__init__(transport, file_name)
self._lock = lockdir.LockDir(self.transport, self.lock_dir_name)
def lock_write(self, token=None):
"""Takes a write lock in the directory containing the config file.
If the directory doesn't exist it is created.
"""
# FIXME: This doesn't check the ownership of the created directories as
# ensure_config_dir_exists does. It should if the transport is local
# -- vila 2011-04-06
self.transport.create_prefix()
token = self._lock.lock_write(token)
return lock.LogicalLockResult(self.unlock, token)
def unlock(self):
self._lock.unlock()
def break_lock(self):
self._lock.break_lock()
def save(self):
with self.lock_write():
# We need to be able to override the undecorated implementation
self.save_without_locking()
def save_without_locking(self):
super(LockableIniFileStore, self).save()
# FIXME: global, breezy, shouldn't that be 'user' instead or even
# 'user_defaults' as opposed to 'user_overrides', 'system_defaults'
# (/etc/bzr/bazaar.conf) and 'system_overrides' ? -- vila 2011-04-05
# FIXME: Moreover, we shouldn't need classes for these stores either, factory
# functions or a registry will make it easier and clearer for tests, focusing
# on the relevant parts of the API that needs testing -- vila 20110503 (based
# on a poolie's remark)
class GlobalStore(LockableIniFileStore):
"""A config store for global options.
There is a single GlobalStore for a given process.
"""
def __init__(self, possible_transports=None):
(path, kind) = _config_dir()
t = transport.get_transport_from_path(
path, possible_transports=possible_transports)
filename = {'bazaar': 'bazaar.conf', 'breezy': 'breezy.conf'}[kind]
super(GlobalStore, self).__init__(t, filename)
self.id = 'breezy'
class LocationStore(LockableIniFileStore):
"""A config store for options specific to a location.
There is a single LocationStore for a given process.
"""
def __init__(self, possible_transports=None):
t = transport.get_transport_from_path(
config_dir(), possible_transports=possible_transports)
super(LocationStore, self).__init__(t, 'locations.conf')
self.id = 'locations'
class BranchStore(TransportIniFileStore):
"""A config store for branch options.
There is a single BranchStore for a given branch.
"""
def __init__(self, branch):
super(BranchStore, self).__init__(branch.control_transport,
'branch.conf')
self.branch = branch
self.id = 'branch'
class ControlStore(LockableIniFileStore):
def __init__(self, bzrdir):
super(ControlStore, self).__init__(bzrdir.transport,
'control.conf',
lock_dir_name='branch_lock')
self.id = 'control'
class SectionMatcher(object):
"""Select sections into a given Store.
This is intended to be used to postpone getting an iterable of sections
from a store.
"""
def __init__(self, store):
self.store = store
def get_sections(self):
# This is where we require loading the store so we can see all defined
# sections.
sections = self.store.get_sections()
# Walk the revisions in the order provided
for store, s in sections:
if self.match(s):
yield store, s
def match(self, section):
"""Does the proposed section match.
:param section: A Section object.
:returns: True if the section matches, False otherwise.
"""
raise NotImplementedError(self.match)
class NameMatcher(SectionMatcher):
def __init__(self, store, section_id):
super(NameMatcher, self).__init__(store)
self.section_id = section_id
def match(self, section):
return section.id == self.section_id
class LocationSection(Section):
def __init__(self, section, extra_path, branch_name=None):
super(LocationSection, self).__init__(section.id, section.options)
self.extra_path = extra_path
if branch_name is None:
branch_name = ''
self.locals = {'relpath': extra_path,
'basename': urlutils.basename(extra_path),
'branchname': branch_name}
def get(self, name, default=None, expand=True):
value = super(LocationSection, self).get(name, default)
if value is not None and expand:
policy_name = self.get(name + ':policy', None)
policy = _policy_value.get(policy_name, POLICY_NONE)
if policy == POLICY_APPENDPATH:
value = urlutils.join(value, self.extra_path)
# expand section local options right now (since POLICY_APPENDPATH
# will never add options references, it's ok to expand after it).
chunks = []
for is_ref, chunk in iter_option_refs(value):
if not is_ref:
chunks.append(chunk)
else:
ref = chunk[1:-1]
if ref in self.locals:
chunks.append(self.locals[ref])
else:
chunks.append(chunk)
value = ''.join(chunks)
return value
class StartingPathMatcher(SectionMatcher):
"""Select sections for a given location respecting the Store order."""
# FIXME: Both local paths and urls can be used for section names as well as
# ``location`` to stay consistent with ``LocationMatcher`` which itself
# inherited the fuzziness from the previous ``LocationConfig``
# implementation. We probably need to revisit which encoding is allowed for
# both ``location`` and section names and how we normalize
# them. http://pad.lv/85479, http://pad.lv/437009 and http://359320 are
# related too. -- vila 2012-01-04
def __init__(self, store, location):
super(StartingPathMatcher, self).__init__(store)
if location.startswith('file://'):
location = urlutils.local_path_from_url(location)
self.location = location
def get_sections(self):
"""Get all sections matching ``location`` in the store.
The most generic sections are described first in the store, then more
specific ones can be provided for reduced scopes.
The returned section are therefore returned in the reversed order so
the most specific ones can be found first.
"""
location_parts = self.location.rstrip('/').split('/')
store = self.store
# Later sections are more specific, they should be returned first
for _, section in reversed(list(store.get_sections())):
if section.id is None:
# The no-name section is always included if present
yield store, LocationSection(section, self.location)
continue
section_path = section.id
if section_path.startswith('file://'):
# the location is already a local path or URL, convert the
# section id to the same format
section_path = urlutils.local_path_from_url(section_path)
if (self.location.startswith(section_path)
or fnmatch.fnmatch(self.location, section_path)):
section_parts = section_path.rstrip('/').split('/')
extra_path = '/'.join(location_parts[len(section_parts):])
yield store, LocationSection(section, extra_path)
class LocationMatcher(SectionMatcher):
def __init__(self, store, location):
super(LocationMatcher, self).__init__(store)
url, params = urlutils.split_segment_parameters(location)
if location.startswith('file://'):
location = urlutils.local_path_from_url(location)
self.location = location
branch_name = params.get('branch')
if branch_name is None:
self.branch_name = urlutils.basename(self.location)
else:
self.branch_name = urlutils.unescape(branch_name)
def _get_matching_sections(self):
"""Get all sections matching ``location``."""
# We slightly diverge from LocalConfig here by allowing the no-name
# section as the most generic one and the lower priority.
no_name_section = None
all_sections = []
# Filter out the no_name_section so _iter_for_location_by_parts can be
# used (it assumes all sections have a name).
for _, section in self.store.get_sections():
if section.id is None:
no_name_section = section
else:
all_sections.append(section)
# Unfortunately _iter_for_location_by_parts deals with section names so
# we have to resync.
filtered_sections = _iter_for_location_by_parts(
[s.id for s in all_sections], self.location)
iter_all_sections = iter(all_sections)
matching_sections = []
if no_name_section is not None:
matching_sections.append(
(0, LocationSection(no_name_section, self.location)))
for section_id, extra_path, length in filtered_sections:
# a section id is unique for a given store so it's safe to take the
# first matching section while iterating. Also, all filtered
# sections are part of 'all_sections' and will always be found
# there.
while True:
section = next(iter_all_sections)
if section_id == section.id:
section = LocationSection(section, extra_path,
self.branch_name)
matching_sections.append((length, section))
break
return matching_sections
def get_sections(self):
# Override the default implementation as we want to change the order
# We want the longest (aka more specific) locations first
sections = sorted(self._get_matching_sections(),
key=lambda match: (match[0], match[1].id),
reverse=True)
# Sections mentioning 'ignore_parents' restrict the selection
for _, section in sections:
# FIXME: We really want to use as_bool below -- vila 2011-04-07
ignore = section.get('ignore_parents', None)
if ignore is not None:
ignore = ui.bool_from_string(ignore)
if ignore:
break
# Finally, we have a valid section
yield self.store, section
# FIXME: _shared_stores should be an attribute of a library state once a
# library_state object is always available.
_shared_stores = {}
_shared_stores_at_exit_installed = False
class Stack(object):
"""A stack of configurations where an option can be defined"""
def __init__(self, sections_def, store=None, mutable_section_id=None):
"""Creates a stack of sections with an optional store for changes.
:param sections_def: A list of Section or callables that returns an
iterable of Section. This defines the Sections for the Stack and
can be called repeatedly if needed.
:param store: The optional Store where modifications will be
recorded. If none is specified, no modifications can be done.
:param mutable_section_id: The id of the MutableSection where changes
are recorded. This requires the ``store`` parameter to be
specified.
"""
self.sections_def = sections_def
self.store = store
self.mutable_section_id = mutable_section_id
def iter_sections(self):
"""Iterate all the defined sections."""
# Ensuring lazy loading is achieved by delaying section matching (which
# implies querying the persistent storage) until it can't be avoided
# anymore by using callables to describe (possibly empty) section
# lists.
for sections in self.sections_def:
for store, section in sections():
yield store, section
def get(self, name, expand=True, convert=True):
"""Return the *first* option value found in the sections.
This is where we guarantee that sections coming from Store are loaded
lazily: the loading is delayed until we need to either check that an
option exists or get its value, which in turn may require to discover
in which sections it can be defined. Both of these (section and option
existence) require loading the store (even partially).
:param name: The queried option.
:param expand: Whether options references should be expanded.
:param convert: Whether the option value should be converted from
unicode (do nothing for non-registered options).
:returns: The value of the option.
"""
# FIXME: No caching of options nor sections yet -- vila 20110503
value = None
found_store = None # Where the option value has been found
# If the option is registered, it may provide additional info about
# value handling
try:
opt = option_registry.get(name)
except KeyError:
# Not registered
opt = None
def expand_and_convert(val):
# This may need to be called in different contexts if the value is
# None or ends up being None during expansion or conversion.
if val is not None:
if expand:
if isinstance(val, string_types):
val = self._expand_options_in_string(val)
else:
trace.warning('Cannot expand "%s":'
' %s does not support option expansion'
% (name, type(val)))
if opt is None:
val = found_store.unquote(val)
elif convert:
val = opt.convert_from_unicode(found_store, val)
return val
# First of all, check if the environment can override the configuration
# value
if opt is not None and opt.override_from_env:
value = opt.get_override()
value = expand_and_convert(value)
if value is None:
for store, section in self.iter_sections():
value = section.get(name)
if value is not None:
found_store = store
break
value = expand_and_convert(value)
if opt is not None and value is None:
# If the option is registered, it may provide a default value
value = opt.get_default()
value = expand_and_convert(value)
for hook in ConfigHooks['get']:
hook(self, name, value)
return value
def expand_options(self, string, env=None):
"""Expand option references in the string in the configuration context.
:param string: The string containing option(s) to expand.
:param env: An option dict defining additional configuration options or
overriding existing ones.
:returns: The expanded string.
"""
return self._expand_options_in_string(string, env)
def _expand_options_in_string(self, string, env=None, _refs=None):
"""Expand options in the string in the configuration context.
:param string: The string to be expanded.
:param env: An option dict defining additional configuration options or
overriding existing ones.
:param _refs: Private list (FIFO) containing the options being expanded
to detect loops.
:returns: The expanded string.
"""
if string is None:
# Not much to expand there
return None
if _refs is None:
# What references are currently resolved (to detect loops)
_refs = []
result = string
# We need to iterate until no more refs appear ({{foo}} will need two
# iterations for example).
expanded = True
while expanded:
expanded = False
chunks = []
for is_ref, chunk in iter_option_refs(result):
if not is_ref:
chunks.append(chunk)
else:
expanded = True
name = chunk[1:-1]
if name in _refs:
raise OptionExpansionLoop(string, _refs)
_refs.append(name)
value = self._expand_option(name, env, _refs)
if value is None:
raise ExpandingUnknownOption(name, string)
chunks.append(value)
_refs.pop()
result = ''.join(chunks)
return result
def _expand_option(self, name, env, _refs):
if env is not None and name in env:
# Special case, values provided in env takes precedence over
# anything else
value = env[name]
else:
value = self.get(name, expand=False, convert=False)
value = self._expand_options_in_string(value, env, _refs)
return value
def _get_mutable_section(self):
"""Get the MutableSection for the Stack.
This is where we guarantee that the mutable section is lazily loaded:
this means we won't load the corresponding store before setting a value
or deleting an option. In practice the store will often be loaded but
this helps catching some programming errors.
"""
store = self.store
section = store.get_mutable_section(self.mutable_section_id)
return store, section
def set(self, name, value):
"""Set a new value for the option."""
store, section = self._get_mutable_section()
section.set(name, store.quote(value))
for hook in ConfigHooks['set']:
hook(self, name, value)
def remove(self, name):
"""Remove an existing option."""
_, section = self._get_mutable_section()
section.remove(name)
for hook in ConfigHooks['remove']:
hook(self, name)
def __repr__(self):
# Mostly for debugging use
return "<config.%s(%s)>" % (self.__class__.__name__, id(self))
def _get_overrides(self):
if breezy._global_state is not None:
# TODO(jelmer): Urgh, this is circular so we can't call breezy.get_global_state()
return breezy._global_state.cmdline_overrides.get_sections()
return []
def get_shared_store(self, store, state=None):
"""Get a known shared store.
Store urls uniquely identify them and are used to ensure a single copy
is shared across all users.
:param store: The store known to the caller.
:param state: The library state where the known stores are kept.
:returns: The store received if it's not a known one, an already known
otherwise.
"""
if state is None:
# TODO(jelmer): Urgh, this is circular so we can't call breezy.get_global_state()
state = breezy._global_state
if state is None:
global _shared_stores_at_exit_installed
stores = _shared_stores
def save_config_changes():
for k, store in stores.items():
store.save_changes()
if not _shared_stores_at_exit_installed:
# FIXME: Ugly hack waiting for library_state to always be
# available. -- vila 20120731
import atexit
atexit.register(save_config_changes)
_shared_stores_at_exit_installed = True
else:
stores = state.config_stores
url = store.external_url()
try:
return stores[url]
except KeyError:
stores[url] = store
return store
class MemoryStack(Stack):
"""A configuration stack defined from a string.
This is mainly intended for tests and requires no disk resources.
"""
def __init__(self, content=None):
"""Create an in-memory stack from a given content.
It uses a single store based on configobj and support reading and
writing options.
:param content: The initial content of the store. If None, the store is
not loaded and ``_load_from_string`` can and should be used if
needed.
"""
store = IniFileStore()
if content is not None:
store._load_from_string(content)
super(MemoryStack, self).__init__(
[store.get_sections], store)
class _CompatibleStack(Stack):
"""Place holder for compatibility with previous design.
This is intended to ease the transition from the Config-based design to the
Stack-based design and should not be used nor relied upon by plugins.
One assumption made here is that the daughter classes will all use Stores
derived from LockableIniFileStore).
It implements set() and remove () by re-loading the store before applying
the modification and saving it.
The long term plan being to implement a single write by store to save
all modifications, this class should not be used in the interim.
"""
def set(self, name, value):
# Force a reload
self.store.unload()
super(_CompatibleStack, self).set(name, value)
# Force a write to persistent storage
self.store.save()
def remove(self, name):
# Force a reload
self.store.unload()
super(_CompatibleStack, self).remove(name)
# Force a write to persistent storage
self.store.save()
class GlobalStack(Stack):
"""Global options only stack.
The following sections are queried:
* command-line overrides,
* the 'DEFAULT' section in bazaar.conf
This stack will use the ``DEFAULT`` section in bazaar.conf as its
MutableSection.
"""
def __init__(self):
gstore = self.get_shared_store(GlobalStore())
super(GlobalStack, self).__init__(
[self._get_overrides,
NameMatcher(gstore, 'DEFAULT').get_sections],
gstore, mutable_section_id='DEFAULT')
class LocationStack(Stack):
"""Per-location options falling back to global options stack.
The following sections are queried:
* command-line overrides,
* the sections matching ``location`` in ``locations.conf``, the order being
defined by the number of path components in the section glob, higher
numbers first (from most specific section to most generic).
* the 'DEFAULT' section in bazaar.conf
This stack will use the ``location`` section in locations.conf as its
MutableSection.
"""
def __init__(self, location):
"""Make a new stack for a location and global configuration.
:param location: A URL prefix to """
lstore = self.get_shared_store(LocationStore())
if location.startswith('file://'):
location = urlutils.local_path_from_url(location)
gstore = self.get_shared_store(GlobalStore())
super(LocationStack, self).__init__(
[self._get_overrides,
LocationMatcher(lstore, location).get_sections,
NameMatcher(gstore, 'DEFAULT').get_sections],
lstore, mutable_section_id=location)
class BranchStack(Stack):
"""Per-location options falling back to branch then global options stack.
The following sections are queried:
* command-line overrides,
* the sections matching ``location`` in ``locations.conf``, the order being
defined by the number of path components in the section glob, higher
numbers first (from most specific section to most generic),
* the no-name section in branch.conf,
* the ``DEFAULT`` section in ``bazaar.conf``.
This stack will use the no-name section in ``branch.conf`` as its
MutableSection.
"""
def __init__(self, branch):
lstore = self.get_shared_store(LocationStore())
bstore = branch._get_config_store()
gstore = self.get_shared_store(GlobalStore())
super(BranchStack, self).__init__(
[self._get_overrides,
LocationMatcher(lstore, branch.base).get_sections,
NameMatcher(bstore, None).get_sections,
NameMatcher(gstore, 'DEFAULT').get_sections],
bstore)
self.branch = branch
def lock_write(self, token=None):
return self.branch.lock_write(token)
def unlock(self):
return self.branch.unlock()
def set(self, name, value):
with self.lock_write():
super(BranchStack, self).set(name, value)
# Unlocking the branch will trigger a store.save_changes() so the
# last unlock saves all the changes.
def remove(self, name):
with self.lock_write():
super(BranchStack, self).remove(name)
# Unlocking the branch will trigger a store.save_changes() so the
# last unlock saves all the changes.
class RemoteControlStack(Stack):
"""Remote control-only options stack."""
# FIXME 2011-11-22 JRV This should probably be renamed to avoid confusion
# with the stack used for remote bzr dirs. RemoteControlStack only uses
# control.conf and is used only for stack options.
def __init__(self, bzrdir):
cstore = bzrdir._get_config_store()
super(RemoteControlStack, self).__init__(
[NameMatcher(cstore, None).get_sections],
cstore)
self.controldir = bzrdir
class BranchOnlyStack(Stack):
"""Branch-only options stack."""
# FIXME: _BranchOnlyStack only uses branch.conf and is used only for the
# stacked_on_location options waiting for http://pad.lv/832042 to be fixed.
# -- vila 2011-12-16
def __init__(self, branch):
bstore = branch._get_config_store()
super(BranchOnlyStack, self).__init__(
[NameMatcher(bstore, None).get_sections],
bstore)
self.branch = branch
def lock_write(self, token=None):
return self.branch.lock_write(token)
def unlock(self):
return self.branch.unlock()
def set(self, name, value):
with self.lock_write():
super(BranchOnlyStack, self).set(name, value)
# Force a write to persistent storage
self.store.save_changes()
def remove(self, name):
with self.lock_write():
super(BranchOnlyStack, self).remove(name)
# Force a write to persistent storage
self.store.save_changes()
class cmd_config(commands.Command):
__doc__ = """Display, set or remove a configuration option.
Display the active value for option NAME.
If --all is specified, NAME is interpreted as a regular expression and all
matching options are displayed mentioning their scope and without resolving
option references in the value). The active value that bzr will take into
account is the first one displayed for each option.
If NAME is not given, --all .* is implied (all options are displayed for the
current scope).
Setting a value is achieved by using NAME=value without spaces. The value
is set in the most relevant scope and can be checked by displaying the
option again.
Removing a value is achieved by using --remove NAME.
"""
takes_args = ['name?']
takes_options = [
'directory',
# FIXME: This should be a registry option so that plugins can register
# their own config files (or not) and will also address
# http://pad.lv/788991 -- vila 20101115
commands.Option('scope', help='Reduce the scope to the specified'
' configuration file.',
type=text_type),
commands.Option('all',
help='Display all the defined values for the matching options.',
),
commands.Option('remove', help='Remove the option from'
' the configuration file.'),
]
_see_also = ['configuration']
@commands.display_command
def run(self, name=None, all=False, directory=None, scope=None,
remove=False):
if directory is None:
directory = '.'
directory = directory_service.directories.dereference(directory)
directory = urlutils.normalize_url(directory)
if remove and all:
raise errors.BzrError(
'--all and --remove are mutually exclusive.')
elif remove:
# Delete the option in the given scope
self._remove_config_option(name, directory, scope)
elif name is None:
# Defaults to all options
self._show_matching_options('.*', directory, scope)
else:
try:
name, value = name.split('=', 1)
except ValueError:
# Display the option(s) value(s)
if all:
self._show_matching_options(name, directory, scope)
else:
self._show_value(name, directory, scope)
else:
if all:
raise errors.BzrError(
'Only one option can be set.')
# Set the option value
self._set_config_option(name, value, directory, scope)
def _get_stack(self, directory, scope=None, write_access=False):
"""Get the configuration stack specified by ``directory`` and ``scope``.
:param directory: Where the configurations are derived from.
:param scope: A specific config to start from.
:param write_access: Whether a write access to the stack will be
attempted.
"""
# FIXME: scope should allow access to plugin-specific stacks (even
# reduced to the plugin-specific store), related to
# http://pad.lv/788991 -- vila 2011-11-15
if scope is not None:
if scope == 'breezy':
return GlobalStack()
elif scope == 'locations':
return LocationStack(directory)
elif scope == 'branch':
(_, br, _) = (
controldir.ControlDir.open_containing_tree_or_branch(
directory))
if write_access:
self.add_cleanup(br.lock_write().unlock)
return br.get_config_stack()
raise NoSuchConfig(scope)
else:
try:
(_, br, _) = (
controldir.ControlDir.open_containing_tree_or_branch(
directory))
if write_access:
self.add_cleanup(br.lock_write().unlock)
return br.get_config_stack()
except errors.NotBranchError:
return LocationStack(directory)
def _quote_multiline(self, value):
if '\n' in value:
value = '"""' + value + '"""'
return value
def _show_value(self, name, directory, scope):
conf = self._get_stack(directory, scope)
value = conf.get(name, expand=True, convert=False)
if value is not None:
# Quote the value appropriately
value = self._quote_multiline(value)
self.outf.write('%s\n' % (value,))
else:
raise NoSuchConfigOption(name)
def _show_matching_options(self, name, directory, scope):
name = lazy_regex.lazy_compile(name)
# We want any error in the regexp to be raised *now* so we need to
# avoid the delay introduced by the lazy regexp. But, we still do
# want the nicer errors raised by lazy_regex.
name._compile_and_collapse()
cur_store_id = None
cur_section = None
conf = self._get_stack(directory, scope)
for store, section in conf.iter_sections():
for oname in section.iter_option_names():
if name.search(oname):
if cur_store_id != store.id:
# Explain where the options are defined
self.outf.write('%s:\n' % (store.id,))
cur_store_id = store.id
cur_section = None
if (section.id is not None and cur_section != section.id):
# Display the section id as it appears in the store
# (None doesn't appear by definition)
self.outf.write(' [%s]\n' % (section.id,))
cur_section = section.id
value = section.get(oname, expand=False)
# Quote the value appropriately
value = self._quote_multiline(value)
self.outf.write(' %s = %s\n' % (oname, value))
def _set_config_option(self, name, value, directory, scope):
conf = self._get_stack(directory, scope, write_access=True)
conf.set(name, value)
# Explicitly save the changes
conf.store.save_changes()
def _remove_config_option(self, name, directory, scope):
if name is None:
raise errors.BzrCommandError(
'--remove expects an option to remove.')
conf = self._get_stack(directory, scope, write_access=True)
try:
conf.remove(name)
# Explicitly save the changes
conf.store.save_changes()
except KeyError:
raise NoSuchConfigOption(name)
# Test registries
#
# We need adapters that can build a Store or a Stack in a test context. Test
# classes, based on TestCaseWithTransport, can use the registry to parametrize
# themselves. The builder will receive a test instance and should return a
# ready-to-use store or stack. Plugins that define new store/stacks can also
# register themselves here to be tested against the tests defined in
# breezy.tests.test_config. Note that the builder can be called multiple times
# for the same test.
# The registered object should be a callable receiving a test instance
# parameter (inheriting from tests.TestCaseWithTransport) and returning a Store
# object.
test_store_builder_registry = registry.Registry()
# The registered object should be a callable receiving a test instance
# parameter (inheriting from tests.TestCaseWithTransport) and returning a Stack
# object.
test_stack_builder_registry = registry.Registry()
|