/usr/share/singular/LIB/tasks.lib is in singular-data 4.0.3+ds-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 | //////////////////////////////////////////////////////////////////////
version="version tasks.lib 4.0.0.0 Dec_2013 "; // $Id: 9c9ecb6bd816d6b914cdf4a7a223d8aa98905ed0 $
category="General purpose";
info="
LIBRARY: tasks.lib A parallel framework based on tasks
AUTHOR: Andreas Steenpass, e-mail: steenpass@mathematik.uni-kl.de
OVERVIEW:
This library provides a parallel framework based on tasks. It introduces a new
Singular type @code{task}; an object of this type is a command (given by a
string) applied to a list of arguments. Tasks can be computed in parallel via
the procedures in this library and they can even be started recursively, i.e.
from within other tasks.
tasks.lib respects the limits for computational resources defined
in @ref{resources_lib}, i.e., all tasks within the same Singular session will
not use more computational resources than provided via resources.lib, even if
tasks are started recursively.
The Singular library @ref{parallel_lib} provides implementations of several
parallel 'skeletons' based on tasks.lib.
KEYWORDS: parallelization; distributed computing; task
SEE ALSO: resources_lib, parallel_lib
PROCEDURES:
createTask(); create a task
killTask(); kill a task
copyTask(); copy a task
compareTasks(); compare two tasks
printTask(); print a task
startTasks(); start tasks
stopTask(); stop a task
waitTasks(); wait for a certain number of tasks
waitAllTasks(); wait for all tasks
pollTask(); poll a task
getCommand(); get the command of a task
getArguments(); get the arguments of a task
getResult(); get the result of a task
getState(); get the state of a task
";
/*
RATIONALE FOR DEVELOPERS
The Singular type 'task'
------------------------
tasks.lib introduces a Singular type 'task' which makes use of a pointer-like
model in order to avoid unnecessary copying of data. 'task' is defined as a
newstruct whose only member is 'int index'. This index points to an entry in
the lib-internal list 'tasks'. The elements of this list are of the type
'internal_task' which is defined as a newstruct with the following members:
int id - the internal ID
string command - the command
list arguments - the arguments
def result - the result
string state - the current state, see 'The life cycle of a task'
list links - control handles, see 'Links'
int linkID - the ID of the control handles
The life cycle of a task
------------------------
'uninitialized' --> 'created' --> 'started' --> 'completed'
| ^
v |
'stopped'
The state of a task t is 'uninitialized' iff
(t.index == 0) or (typeof(tasks[t.index]) != "internal_task").
A task can be reset to 'uninitialized' by killTask() at any time.
Assigned members for 'internal_task'
------------------------------------
For each state, the following members of an internal_task must be assigned:
created: command arguments state
started: id command arguments state links linkID
stopped: command arguments state
completed: command arguments result state
All other members should be wiped out.
Local supervisors
-----------------
A call of 'startTasks(t(1..n));' for tasks t(1), ..., t(n) creates a child
process which plays the role of a supervisor for these tasks. The computation
of the tasks is done in child processes of the supervisor.
The supervisor assigns an internal state to each task which is represented by
an integer. The meaning of these integers and their relation to the global
state of each task is as follows:
internal state | meaning | corresponding global state
---------------|-------------------|---------------------------
0 | waiting | started
1 | started | started
2 | (result) computed | started
3 | (result) sent | completed
-1 | stopped | stopped
Links
-----
The ssi link between the main process and the supervisor is named 'l(pid)'
where pid is the PID of the main process. The links between the supervisor and
its child processes are named 'll(pid)(1)', 'll(pid)(2)', ... where pid is the
PID of the supervisor. The link between the child processes of the supervisor
and the main process is named 'L(pid)' where pid is the PID of the main
process. This link is only for sending the results to the main process and must
not be used in the other direction!
For any task t whose state is 'started', tasks[t.index].links is
list(L(pid), l(pid)) where pid is the PID of the main process.
Communication model
-------------------
stopTask() <--> supervisor
0, id -->
waitTasks() <--> supervisor
(demanded_task is an intvec containing the IDs of the tasks which are being
waited for; ndemanded is the number of tasks that is being waited for.)
1, demanded_tasks, ndemanded -->
[receiving results]
1, 0:2, -1 -->
results_sent <--
[receiving remaining results]
pollTask() <--> supervisor
2, id -->
state <--
[receive result if state == 2 (computed)]
startTasks_child() <--> startTasks_grandchild()
[compute the result]
1, id <--
[wait until the result is requested]
1 -->
[send the result]
2 <--
sending and receiving results:
main process <--> supervisor <--> startTasks_grandchild()
[request the result, see above]
index, result (main process <-- startTasks_grandchild())
3, id (main process --> supervisor)
*/
LIB "resources.lib";
static proc mod_init()
{
/* initialize the semaphores */
if (!defined(Resources)) {
LIB "resources.lib";
}
// the number of processor cores
int sem_cores = Resources::sem_cores;
exportto(Tasks, sem_cores);
// the number of leaves in the parallel tree (not strict)
int sem_leaves = semaphore(system("--cpus")+10);
exportto(Tasks, sem_leaves);
// the number of processes waiting for sem_cores with low priority
int sem_queue = semaphore(2);
exportto(Tasks, sem_queue);
/* define the Singular type 'task' */
newstruct("task", "int index");
newstruct("internal_task", "int id, string command, list arguments,"
+"def result, string state, list links, int linkID");
system("install", "task", "=", createTask, 1);
system("install", "task", "==", compareTasks, 2);
system("install", "task", "print", printTask, 1);
/* define (lib-)global variables */
list tasks; // the lib-internal list of tasks
exportto(Tasks, tasks);
int ntasks; // the current maximal index in 'tasks'
exportto(Tasks, ntasks);
int nlinkIDs; // the current maximal linkID
exportto(Tasks, nlinkIDs);
}
proc createTask(alias string command, alias list arguments)
"USAGE: createTask(command, arguments), command string, arguments list
RETURN: a task with the given command and arguments whose state is 'created'.
NOTE: 't = command, arguments;' is a shortcut for
't = createTask(command, arguments);'.
SEE ALSO: startTasks, getCommand, getArguments, getState, killTask, copyTask,
compareTasks, printTask
EXAMPLE: example createTask; shows an example"
{
internal_task T;
ntasks++;
tasks[ntasks] = T;
tasks[ntasks].command = command;
tasks[ntasks].arguments = arguments;
tasks[ntasks].state = "created";
task t;
t.index = ntasks;
return(t);
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t = createTask("std", list(I));
// This is the same as:
// task t = "std", list(I);
t;
killTask(t);
}
proc killTask(task t)
"USAGE: killTask(t), t task
RETURN: nothing. If the state of t is 'started', then t is stopped first. The
internal data structures of t are erased and its state is set to
'uninitialized'.
NOTE: 'killTask(t);' is not the same as 'kill t;'. The latter command does
not erase the internal data structures of t. Hence killTask() should
be called for any no longer needed task in order to free memory.
SEE ALSO: stopTask, getState, createTask, printTask
EXAMPLE: example killTask; shows an example"
{
if (t.index == 0) {
return();
}
if (typeof(tasks[t.index]) != "internal_task") {
return();
}
if (tasks[t.index].state == "started") {
stopTask(t);
}
tasks[t.index] = def(0);
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t = "std", list(I);
startTasks(t);
t;
killTask(t);
t;
getState(t);
}
proc copyTask(task t)
"USAGE: copyTask(t), t task
RETURN: a copy of t.
NOTE: 'task t1 = copyTask(t2);' is not the same as 'task t1 = t2;'. After
the latter command, t1 points to the same object as t2; any changes
to t2 will also effect t1. In contrast to this, copyTask() creates a
new independend task.
@* A task whose state is 'started' cannot be copied.
SEE ALSO: getCommand, getArguments, getResult, getState, createTask, killTask,
compareTasks, printTask
EXAMPLE: example copyTask; shows an example"
{
task t_copy;
if (t.index == 0) {
return(t_copy);
}
if (typeof(tasks[t.index]) != "internal_task") {
return(t_copy);
}
if (tasks[t.index].state == "started") {
ERROR("cannot copy a task whose state is 'started'");
}
ntasks++;
tasks[ntasks] = tasks[t.index];
t_copy.index = ntasks;
return(t_copy);
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t1 = "std", list(I);
startTasks(t1);
waitAllTasks(t1);
task t2 = copyTask(t1);
killTask(t1);
t2; // t2 survived
getResult(t2);
killTask(t2);
}
proc compareTasks(task t1, task t2)
"USAGE: compareTasks(t1, t2), t1, t2 tasks
RETURN: 1, if t1 and t2 coincide;
0, otherwise.
NOTE: The arguments and the results of t1 and t2 are not compared.
@* 't1 == t2' is a shortcut for 'compareTasks(t1, t2)'.
SEE ALSO: getCommand, getArguments, getResult, getState, copyTask, printTask
EXAMPLE: example compareTasks; shows an example"
{
if (tasks[t1.index].id != tasks[t2.index].id) {
return(0);
}
if (tasks[t1.index].command != tasks[t2.index].command) {
return(0);
}
if (tasks[t1.index].state != tasks[t2.index].state) {
return(0);
}
if (tasks[t1.index].linkID != tasks[t2.index].linkID) {
return(0);
}
return(1);
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t1 = "std", list(I);
task t2 = "std", list(I);
compareTasks(t1, t2);
startTasks(t1);
waitAllTasks(t1);
t1 == t2; // the same as compareTasks(t1, t2);
killTask(t1);
killTask(t2);
// The arguments and the result are not compared!
ideal J = x;
task t3 = "std", list(I);
task t4 = "std", list(J);
t3 == t4;
killTask(t3);
killTask(t4);
}
proc printTask(task t)
"USAGE: printTask(t), t task
RETURN: nothing. Prints information about t.
NOTE: 'print(t);' and 't;' are shortcuts for 'printTask(t)'.
SEE ALSO: getCommand, getArguments, getResult, getState, createTask, killTask
EXAMPLE: example printTask; shows an example"
{
if (t.index == 0) {
"An uninitialized task";
return();
}
if (typeof(tasks[t.index]) != "internal_task") {
"An uninitialized task";
return();
}
"A task with the following properties:"+newline
+"command: "+tasks[t.index].command+newline
+"no. of arguments: "+string(size(tasks[t.index].arguments))+newline
+"state: "+tasks[t.index].state;
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t;
printTask(t);
t = "std", list(I);
t; // the same as printTask(t);
startTasks(t);
waitAllTasks(t);
t;
killTask(t);
}
proc startTasks(list #)
"USAGE: startTasks(t1, t2, ...), t1, t2, ... tasks
RETURN: nothing. Starts the tasks t1, t2, ... and sets their states to
'started'.
NOTE: A task whose state is neither 'created' nor 'stopped' cannot be
started.
@* If startTasks() is applied to a task whose state is 'stopped', then
the computation of this task will be restarted from the beginning.
@* Tasks can be started from within other tasks. A started task should
not be accessed from within any task other than the one within which
it was started.
@* For each task, the start of its computation is subject to the
internal scheduling.
SEE ALSO: stopTask, waitTasks, pollTask, getState, createTask, printTask
EXAMPLE: example startTasks; shows an example"
{
int nargs = size(#);
if (nargs == 0) {
ERROR("missing argument");
}
int i;
for (i = nargs; i > 0; i--) {
if (typeof(#[i]) != "task") {
ERROR("argument not of type 'task' (argument no. "+string(i)+")");
}
if (#[i].index == 0) {
ERROR("cannot start an uninitialized task (task no. "
+string(i)+")");
}
if (typeof(tasks[#[i].index]) != "internal_task") {
ERROR("cannot start an uninitialized task (task no. "
+string(i)+")");
}
if (tasks[#[i].index].state != "created"
&& tasks[#[i].index].state != "stopped") {
ERROR("cannot start a task whose state is not"+newline
+"'created' or 'stopped'");
}
}
for (i = nargs; i > 0; i--) {
tasks[#[i].index].id = i; // has to be set before forking
tasks[#[i].index].state = "started";
}
int pid = system("pid");
link l(pid) = "ssi:fork";
open(l(pid));
write(l(pid), quote(startTasks_child(#, eval(pid))));
int port = read(l(pid));
link L(pid) = "ssi:connect localhost:"+string(port);
open(L(pid));
nlinkIDs++;
for (i = nargs; i > 0; i--) {
tasks[#[i].index].links = list(L(pid), l(pid));
tasks[#[i].index].linkID = nlinkIDs;
}
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t1 = "std", list(I);
task t2 = "slimgb", list(I);
startTasks(t1, t2);
waitAllTasks(t1, t2);
getResult(t1);
getResult(t2);
killTask(t1);
killTask(t2);
}
/* This procedure is started within the child after forking. */
static proc startTasks_child(list localtasks, int pid_parent)
{
int port = system("reserve", 1);
write(l(pid_parent), port);
link L(pid_parent) = system("reservedLink");
export(L(pid_parent));
int sem_write = semaphore(1);
int pid = system("pid");
int nlocaltasks = size(localtasks);
intvec state = 0:nlocaltasks;
// the internal state of each localtask (see rationale)
int nwaiting = nlocaltasks;
// the number of local tasks with internal state 0 (waiting)
int nfinished;
// the number of local tasks with internal state 3 (result sent) or
// -1 (stopped)
intvec queue = 1..nlocaltasks;
int next = 1;
list links;
links[nlocaltasks+1] = l(pid_parent);
intvec assignment = 0:nlocaltasks;
// the task with id = i is running in link no. assignment[i]
int nlinks;
// data sent by other processes
int code;
int id;
if (!defined(demanded_tasks)) {
intvec demanded_tasks;
int demanded_tasks_index = 1;
exportto(Tasks, demanded_tasks);
exportto(Tasks, demanded_tasks_index);
}
else {
demanded_tasks = 0;
demanded_tasks_index = 1;
}
int ndemanded = -1;
// internal counts
int granted_leaves;
int results_sent;
// auxiliary variables
intvec waiting_tasks;
int wait;
int deadlock;
int tmp;
int i;
int j;
while (nwaiting > 0) {
wait = 0;
if (nlinks == 0) {
wait = -1;
granted_leaves++;
while (-wait < nwaiting) {
if (system("semaphore", "try_acquire", sem_leaves) == 1) {
wait--;
}
else {
break;
}
}
}
while (wait == 0) {
wait = waitfirst(links, 500);
if (wait == 0) {
while (-wait < nwaiting) {
if (system("semaphore", "try_acquire", sem_leaves) == 1) {
wait--;
}
else {
break;
}
}
}
}
if (wait < 0) { // open (-wait) new links
while (wait < 0) {
wait++;
nlinks++;
link ll(pid)(nlinks) = "ssi:fork";
open(ll(pid)(nlinks));
links[nlinks] = ll(pid)(nlinks);
write(links[nlinks],
quote(startTasks_grandchild(
eval(localtasks[queue[next]].index), eval(pid_parent),
eval(pid), eval(nlinks), eval(sem_write))));
assignment[queue[next]] = nlinks;
state[queue[next]] = 1;
nwaiting--;
next++;
}
// wait == 0
}
if (wait > 0 && wait <= nlocaltasks) {
code = read(links[wait]);
if (code == 1) { // result computed
id = read(links[wait]);
state[id] = 2;
if (ndemanded > 0 && removeDemanded(id)) {
write(links[wait], 1);
ndemanded--;
results_sent++;
}
}
// code == 2: startTasks_grandchild() ended, do nothing
}
if (wait == nlocaltasks+1) {
code = read(l(pid_parent));
if (code == 0) { // stopTask
id = read(l(pid_parent));
if (state[id] == 0) { // waiting
queue = give_priority(queue, intvec(id));
next++;
}
if (state[id] == 1 || state[id] == 2) { // started or computed
close(links[assignment[id]]);
open(links[assignment[id]]);
write(links[assignment[id]],
quote(startTasks_grandchild(
eval(localtasks[queue[next]].index), eval(pid_parent),
eval(pid), eval(assignment[id]), eval(sem_write))));
assignment[queue[next]] = assignment[id];
assignment[id] = 0;
state[queue[next]] = 1;
next++;
}
// state[id] == -1 (stopped) or state[id] == 3 (sent)
// should not happen
nwaiting--;
nfinished++;
state[id] = -1;
}
if (code == 1) { // waitTasks
demanded_tasks = read(l(pid_parent));
demanded_tasks_index = size(demanded_tasks);
ndemanded = read(l(pid_parent));
if (ndemanded > demanded_tasks_index) {
ndemanded = demanded_tasks_index;
}
if (demanded_tasks == 0 && ndemanded == -1) {
write(l(pid_parent), results_sent);
continue;
}
else {
results_sent = 0;
}
demanded_tasks = demanded_tasks[demanded_tasks_index..1];
deadlock = 0;
waiting_tasks = 0:demanded_tasks_index;
j = 0;
for (i = demanded_tasks_index; i > 0; i--) {
id = demanded_tasks[i];
if (state[id] == 0) { // waiting
j++;
waiting_tasks[j] = id;
deadlock = 1;
}
}
if (j > 0) {
waiting_tasks = waiting_tasks[1..j];
queue = queue[next..size(queue)];
next = 1;
queue = give_priority(queue, waiting_tasks);
waiting_tasks = 0;
}
for (i = demanded_tasks_index; i > 0; i--) {
id = demanded_tasks[i];
if (state[id] == 1) { // started
deadlock = 0;
}
if (state[id] == 2) { // computed
write(links[assignment[id]], 1);
tmp = removeDemanded(id);
ndemanded--;
results_sent++;
deadlock = 0;
}
}
if (deadlock) {
granted_leaves++;
nlinks++;
link ll(pid)(nlinks) = "ssi:fork";
open(ll(pid)(nlinks));
links[nlinks] = ll(pid)(nlinks);
write(links[nlinks],
quote(startTasks_grandchild(
eval(localtasks[queue[next]].index), eval(pid_parent),
eval(pid), eval(nlinks), eval(sem_write))));
assignment[queue[next]] = nlinks;
state[queue[next]] = 1;
nwaiting--;
next++;
}
}
if (code == 2) { // pollTask
id = read(l(pid_parent));
if (state[id] == 0) { // waiting
queue = queue[next..size(queue)];
next = 1;
queue = give_priority(queue, intvec(id));
}
if (state[id] == 2) { // computed
write(links[assignment[id]], 1);
}
write(l(pid_parent), state[id]);
}
if (code == 3) { // got result
id = read(l(pid_parent));
write(links[assignment[id]],
quote(startTasks_grandchild(
eval(localtasks[queue[next]].index), eval(pid_parent),
eval(pid), eval(assignment[id]), eval(sem_write))));
assignment[queue[next]] = assignment[id];
assignment[id] = 0;
state[queue[next]] = 1;
state[id] = 3;
nwaiting--;
nfinished++;
next++;
}
}
}
while (nfinished < nlocaltasks || ndemanded != -1) {
wait = waitfirst(links);
if (wait <= nlocaltasks) {
code = read(links[wait]);
if (code == 1) { // result computed
id = read(links[wait]);
state[id] = 2;
if (ndemanded > 0 && removeDemanded(id)) {
write(links[wait], 1);
ndemanded--;
results_sent++;
}
}
// code == 2: startTasks_grandchild() ended, do nothing
}
if (wait == nlocaltasks+1) {
code = read(l(pid_parent));
if (code == 0) { // stopTask
id = read(l(pid_parent));
if (state[id] == 1 || state[id] == 2) { // started or computed
close(links[assignment[id]]);
if (nlinks > granted_leaves) {
tmp = system("semaphore", "release", sem_leaves);
}
links[assignment[id]] = def(0);
nlinks--;
assignment[id] = 0;
nfinished++;
}
// else: nothing to do
state[id] = -1;
}
if (code == 1) { // waitTasks
demanded_tasks = read(l(pid_parent));
demanded_tasks_index = size(demanded_tasks);
ndemanded = read(l(pid_parent));
if (ndemanded > demanded_tasks_index) {
ndemanded = demanded_tasks_index;
}
if (demanded_tasks == 0 && ndemanded == -1) {
write(l(pid_parent), results_sent);
continue;
}
else {
results_sent = 0;
}
demanded_tasks = demanded_tasks[demanded_tasks_index..1];
for (i = demanded_tasks_index; i > 0; i--) {
id = demanded_tasks[i];
if (state[id] == 2) { // computed
write(links[assignment[id]], 1);
tmp = removeDemanded(id);
ndemanded--;
results_sent++;
}
}
}
if (code == 2) { // pollTask
id = read(l(pid_parent));
if (state[id] == 2) { // computed
write(links[assignment[id]], 1);
}
write(l(pid_parent), state[id]);
}
if (code == 3) { // got result
id = read(l(pid_parent));
close(links[assignment[id]]);
if (nlinks > granted_leaves) {
tmp = system("semaphore", "release", sem_leaves);
}
links[assignment[id]] = def(0);
nlinks--;
assignment[id] = 0;
state[id] = 3;
nfinished++;
}
}
}
}
/* This procedure has to be started within the grandchildren after forking. */
static proc startTasks_grandchild(int index, int pid_grandparent,
int pid_parent, int link_no, int sem_write)
{
def result;
int tmp = system("semaphore", "acquire", sem_queue);
tmp = system("semaphore", "acquire", sem_cores);
tmp = system("semaphore", "release", sem_queue);
execute("result = "+tasks[index].command+"("
+argsToString("tasks[index].arguments", size(tasks[index].arguments))
+");");
tmp = system("semaphore", "release", sem_cores);
write(ll(pid_parent)(link_no), 1);
write(ll(pid_parent)(link_no), tasks[index].id);
tmp = read(ll(pid_parent)(link_no));
tmp = system("semaphore", "acquire", sem_write);
write(L(pid_grandparent), index);
write(L(pid_grandparent), result);
tmp = system("semaphore", "release", sem_write);
return(2);
}
/* Remove id from demanded_tasks and return 1, if id is an element of
* demanded_tasks; return 0, otherwise. Note:
* - demanded_tasks and demanded_tasks_index are (lib-)global objects
* exported in startTasks_child().
* - demanded_tasks_index is used to avoid copying. It is defined to be
* the greatest integer with demanded_tasks[demanded_tasks_index] != 0
* and demanded_tasks[demanded_tasks_index+1] == 0 (if defined).
*/
static proc removeDemanded(alias int id)
{
if (demanded_tasks[demanded_tasks_index] == id) {
demanded_tasks[demanded_tasks_index] = 0;
demanded_tasks_index--;
return(1);
}
int i;
for (i = demanded_tasks_index-1; i > 0; i--) {
if (demanded_tasks[i] == id) {
demanded_tasks[i..demanded_tasks_index]
= demanded_tasks[(i+1)..demanded_tasks_index], 0;
demanded_tasks_index--;
return(1);
}
}
return(0);
}
/* Move the elements in 'preferred' to the beginning of 'queue'. We may assume
* that
* - 'preferred' is a subset of 'queue';
* - the elements of 'preferred' are distinct and non-zero;
* - the elements of 'queue' are distinct and non-zero.
* For performance reasons, we may also assume that 'queue' and 'preferred' are
* more or less ordered in most cases. Note that queue has the format
* '0, indices, 0'.
*/
static proc give_priority(intvec queue, intvec preferred)
{
int size_queue = size(queue);
int size_preferred = size(preferred);
if (size_queue == size_preferred) {
return(queue);
}
int index = size_queue;
int i;
int j;
for (i = size_preferred; i > 0; i--) {
for (j = size_queue; j > 0; j--) {
if (queue[index] == preferred[i]) {
queue[index] = 0;
break;
}
index--;
if (index == 0) {
index = size_queue;
}
}
}
intvec not_preferred = 0:(size_queue-size_preferred);
index = 1;
for (i = 1; i <= size_queue; i++) {
if (queue[i]) {
not_preferred[index] = queue[i];
index++;
}
}
queue = preferred, not_preferred;
return(queue);
}
proc stopTask(task t)
"USAGE: stopTask(t), t task
RETURN: nothing. Stops the t and sets its state to 'stopped'.
NOTE: A task whose state is not 'started' cannot be stopped.
@* Intermediate results are discarded when a task is stopped.
@* killTask() should be called for any no longer needed task.
SEE ALSO: startTasks, waitTasks, pollTask, getState, killTask, printTask
EXAMPLE: example stopTask; shows an example"
{
if (t.index == 0) {
ERROR("cannot stop an uninitialized task");
}
if (typeof(tasks[t.index]) != "internal_task") {
ERROR("cannot stop an uninitialized task");
}
if (tasks[t.index].state != "started") {
ERROR("cannot stop a task whose state is not 'started'");
}
write(tasks[t.index].links[2], 0);
write(tasks[t.index].links[2], tasks[t.index].id);
tasks[t.index].id = 0;
tasks[t.index].links = list();
tasks[t.index].linkID = 0;
tasks[t.index].state = "stopped";
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t = "std", list(I);
startTasks(t);
stopTask(t);
t;
killTask(t);
}
proc waitTasks(list T, int N, list #)
"USAGE: waitTasks(T, N[, timeout]), T list of tasks, N int, timeout int
RETURN: an ordered list of the indices of those tasks which have been
successfully completed. The state of these tasks is set to
'completed'.
@* The procedure waits for N tasks to complete.
@* An optional timeout in ms can be provided. Default is 0 which
disables the timeout.
NOTE: A task whose state is neither 'started' nor 'completed' cannot be
waited for.
@* The result of any completed task can be accessed via @ref{getResult}.
@* The returned list may contain more than N entries if the computation
of some tasks has already finished and/or if several tasks finish
\"at the same time\". It may contain less than N entries in
the case of timeout or errors occurring.
@* Polling is guaranteed, i.e. the index of any task t for which
'pollTask(t);' would return 1 will appear in the returned list.
SEE ALSO: startTasks, pollTask, getResult, getState, printTask
EXAMPLE: example waitTasks; shows an example"
{
/* initialize the timer */
int oldtimerresolution = system("--ticks-per-sec");
system("--ticks-per-sec", 1000);
int starting_time = rtimer;
/* read optional parameters */
int timeout;
if (size(#) > 0) {
if (size(#) > 1 || typeof(#[1]) != "int") {
ERROR("wrong optional parameter");
}
timeout = #[1];
}
/* check for errors */
if (timeout < 0) {
ERROR("negative timeout");
}
int nargs = size(T);
if (nargs == 0) {
ERROR("missing task");
}
if (N < 1 || N > nargs) {
ERROR("wrong number of tasks to wait for");
}
int i;
for (i = nargs; i > 0; i--) {
if (typeof(T[i]) != "task") {
ERROR("element not of type 'task' (element no. "+string(i)+")");
}
if (T[i].index == 0) {
ERROR("cannot wait for an uninitialized task (task no. "+string(i)
+")");
}
if (typeof(tasks[T[i].index]) != "internal_task") {
ERROR("cannot wait for an uninitialized task (task no. "+string(i)
+")");
}
if (tasks[T[i].index].state != "started"
&& tasks[T[i].index].state != "completed") {
ERROR("cannot wait for a task whose state is not"+newline
+"'started' or 'completed' (task no. "+string(i)+")");
}
}
/* sort the tasks */
int ncompleted;
list requests;
list links;
int sorted_in;
int j;
for (i = 1; i <= nargs; i++) {
if (tasks[T[i].index].state == "completed") {
ncompleted++;
}
else { // tasks[T[i].index].state == "started"
sorted_in = 0;
for (j = size(requests); j > 0; j--) {
if (requests[j][1] == tasks[T[i].index].linkID) {
requests[j][2][size(requests[j][2])+1] =
tasks[T[i].index].id;
sorted_in = 1;
break;
}
}
if (!sorted_in) {
requests[size(requests)+1] = list(tasks[T[i].index].linkID,
intvec(tasks[T[i].index].id),
tasks[T[i].index].links[2]);
links[size(links)+1] = tasks[T[i].index].links[1];
}
}
}
/* send the reqests */
for (j = size(requests); j > 0; j--) {
write(requests[j][3], 1);
write(requests[j][3], requests[j][2]);
write(requests[j][3], N-ncompleted);
}
/* wait for the results */
int wait;
int index;
int results_got;
int remaining_time;
int tmp;
while (ncompleted < N) {
wait = waitfirst(links, 0);
if (wait == 0) {
if (timeout == 0) {
tmp = system("semaphore", "release", sem_cores);
wait = waitfirst(links);
tmp = system("semaphore", "acquire", sem_cores);
}
else {
remaining_time = timeout-(rtimer-starting_time);
if (remaining_time < 0) {
break;
}
else {
tmp = system("semaphore", "release", sem_cores);
wait = waitfirst(links, remaining_time);
tmp = system("semaphore", "acquire", sem_cores);
}
}
}
if (wait < 1) {
break;
}
index = read(links[wait]);
tasks[index].result = read(links[wait]);
write(tasks[index].links[2], 3);
write(tasks[index].links[2], tasks[index].id);
tasks[index].id = 0;
tasks[index].links = list();
tasks[index].linkID = 0;
tasks[index].state = "completed";
ncompleted++;
results_got++;
}
if (wait == -1) {
ERROR("error in waitfirst()");
}
/* end communication process */
for (j = size(requests); j > 0; j--) {
write(requests[j][3], 1);
write(requests[j][3], 0);
write(requests[j][3], -1);
}
int results_sent;
for (j = size(requests); j > 0; j--) {
results_sent = results_sent + read(requests[j][3]);
}
while (results_sent > results_got) {
wait = waitfirst(links);
if (wait == -1) {
ERROR("error in waitfirst()");
}
index = read(links[wait]);
tasks[index].result = read(links[wait]);
write(tasks[index].links[2], 3);
write(tasks[index].links[2], tasks[index].id);
tasks[index].id = 0;
tasks[index].links = list();
tasks[index].linkID = 0;
tasks[index].state = "completed";
results_got++;
}
/* list completed tasks */
list completed;
completed[nargs+1] = 0;
j = 0;
for (i = 1; i <= nargs; i++) {
if (tasks[T[i].index].state == "completed") {
j++;
completed[j] = i;
}
}
completed[nargs+1] = def(0);
/* return the result */
system("--ticks-per-sec", oldtimerresolution);
return(completed);
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t1 = "std", list(I);
task t2 = "slimgb", list(I);
startTasks(t1, t2);
waitTasks(list(t1, t2), 2); // wait for both tasks
getResult(t1);
getResult(t2);
killTask(t1);
killTask(t2);
}
proc waitAllTasks(list #)
"USAGE: waitAllTasks(t1, t2, ...), t1, t2, ... tasks
RETURN: nothing. Waits for all the tasks t1, t2, ... to complete. The state
of the tasks is set to 'completed'.
NOTE: A task whose state is neither 'started' nor 'completed' cannot be
waited for.
@* The result of any completed task can be accessed via @ref{getResult}.
@* 'waitAllTasks(t1, t2, ...);' is a shortcut for
'waitTasks(list(t1, t2, ...), size(list(t1, t2, ...)));'. Since
returning a list of the indices of the completed tasks does not make
sense in this case, nothing is returned.
SEE ALSO: waitTasks, startTasks, pollTask, getResult, getState, printTask
EXAMPLE: example waitAllTasks; shows an example"
{
list tmp = waitTasks(#, size(#));
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t1 = "std", list(I);
task t2 = "slimgb", list(I);
startTasks(t1, t2);
waitAllTasks(t1, t2); // the same as 'waitTasks(list(t1, t2), 2);',
// but without return value
getResult(t1);
getResult(t2);
killTask(t1);
killTask(t2);
}
proc pollTask(task t)
"USAGE: pollTask(t), t task
RETURN: 1, if the computation of the task t has successfully finished;
0, otherwise.
@* The state of any task whose computation has successfully finished is
set to 'completed'.
NOTE: A task whose state is neither 'started' nor 'completed' cannot be
polled.
@* The result of any completed task can be accessed via @ref{getResult}.
@* pollTask() should return immediately. However, receiving the result
of the task may take some time.
SEE ALSO: startTasks, waitTasks, getResult, getState, printTask
EXAMPLE: example pollTask; shows an example"
{
if (t.index == 0) {
ERROR("cannot poll an uninitialized task");
}
if (typeof(tasks[t.index]) != "internal_task") {
ERROR("cannot poll an uninitialized task");
}
if (tasks[t.index].state != "started"
&& tasks[t.index].state != "completed") {
ERROR("cannot poll a task whose state is not"+newline
+"'started' or 'completed'");
}
if (tasks[t.index].state == "completed") {
return(1);
}
// tasks[t.index].state == "started"
write(tasks[t.index].links[2], 2);
write(tasks[t.index].links[2], tasks[t.index].id);
int state = read(tasks[t.index].links[2]);
if (state == 0 || state == 1) { // waiting or started
return(0);
}
if (state == 2) { // computed
int index = read(tasks[t.index].links[1]); // index == t.index
tasks[t.index].result = read(tasks[t.index].links[1]);
write(tasks[t.index].links[2], 3);
write(tasks[t.index].links[2], tasks[t.index].id);
tasks[t.index].id = 0;
tasks[t.index].links = list();
tasks[t.index].linkID = 0;
tasks[t.index].state = "completed";
return(1);
}
// state == -1 (stopped) or state == 3 (sent) should not happen
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t = "std", list(I);
startTasks(t);
waitAllTasks(t);
pollTask(t); // task already completed
t;
getResult(t);
killTask(t);
}
proc getCommand(task t)
"USAGE: getCommand(t), t task
RETURN: a string, the command of t.
NOTE: This command cannot be applied to tasks whose state is
'uninitialized'.
SEE ALSO: getArguments, getResult, getState, createTask, printTask
EXAMPLE: example getCommand; shows an example"
{
if (t.index == 0) {
ERROR("cannot get command of an uninitialized task");
}
if (typeof(tasks[t.index]) != "internal_task") {
ERROR("cannot get command of an uninitialized task");
}
return(tasks[t.index].command);
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t = "std", list(I);
getCommand(t);
killTask(t);
}
proc getArguments(task t)
"USAGE: getArguments(t), t task
RETURN: a list, the arguments of t.
NOTE: This command cannot be applied to tasks whose state is
'uninitialized'.
SEE ALSO: getCommand, getResult, getState, createTask, printTask
EXAMPLE: example getArguments; shows an example"
{
if (t.index == 0) {
ERROR("cannot get arguments of an uninitialized task");
}
if (typeof(tasks[t.index]) != "internal_task") {
ERROR("cannot get arguments of an uninitialized task");
}
return(tasks[t.index].arguments);
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t = "std", list(I);
getArguments(t);
killTask(t);
}
proc getResult(task t)
"USAGE: getResult(t), t task
RETURN: the result of t.
NOTE: This command cannot be applied to tasks whose state is not
'completed'.
SEE ALSO: getCommand, getArguments, getState, waitTasks, pollTask, printTask
EXAMPLE: example getResult; shows an example"
{
if (t.index == 0) {
ERROR("cannot get result of an uninitialized task");
}
if (typeof(tasks[t.index]) != "internal_task") {
ERROR("cannot get result of an uninitialized task");
}
if (tasks[t.index].state != "completed") {
ERROR("cannot get result of a task which is not completed");
}
return(tasks[t.index].result);
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t = "std", list(I);
startTasks(t);
waitAllTasks(t);
getResult(t);
killTask(t);
}
proc getState(task t)
"USAGE: getState(t), t task
RETURN: a string, the state of t.
SEE ALSO: getCommand, getArguments, getResult, printTask, createTask,
startTasks, stopTask, waitTasks, pollTask, killTask
EXAMPLE: example getState; shows an example"
{
if (t.index == 0) {
return("uninitialized");
}
if (typeof(tasks[t.index]) != "internal_task") {
return("uninitialized");
}
return(tasks[t.index].state);
}
example
{
"EXAMPLE:";
echo = 2;
ring R = 0, (x,y), dp;
ideal I = x9y2+x10, x2y7-y8;
task t = "std", list(I);
getState(t);
startTasks(t);
getState(t);
waitAllTasks(t);
getState(t);
killTask(t);
getState(t);
}
/ * construct the string "name[1], name[2], name[3], ..., name[length]" */
static proc argsToString(string name, int length)
{
string output;
if (length > 0) {
output = name+"[1]";
}
int i;
for (i = 2; i <= length; i++) {
output = output+", "+name+"["+string(i)+"]";
}
return(output);
}
|