/usr/include/root/TProofServ.h is in libroot-proof-dev 5.34.14-1build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 | // @(#)root/proof:$Id$
// Author: Fons Rademakers 16/02/97
/*************************************************************************
* Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
#ifndef ROOT_TProofServ
#define ROOT_TProofServ
//////////////////////////////////////////////////////////////////////////
// //
// TProofServ //
// //
// TProofServ is the PROOF server. It can act either as the master //
// server or as a slave server, depending on its startup arguments. It //
// receives and handles message coming from the client or from the //
// master server. //
// //
//////////////////////////////////////////////////////////////////////////
#ifndef ROOT_TApplication
#include "TApplication.h"
#endif
#ifndef ROOT_TString
#include "TString.h"
#endif
#ifndef ROOT_TSysEvtHandler
#include "TSysEvtHandler.h"
#endif
#ifndef ROOT_TStopwatch
#include "TStopwatch.h"
#endif
#ifndef ROOT_TTimer
#include "TTimer.h"
#endif
#ifndef ROOT_TProofQueryResult
#include "TProofQueryResult.h"
#endif
class TDataSetManager;
class TDataSetManagerFile;
class TDSet;
class TDSetElement;
class TFileCollection;
class TFileHandler;
class THashList;
class TIdleTOTimer;
class TList;
class TMap;
class TMessage;
class TMonitor;
class TMutex;
class TProof;
class TProofLockPath;
class TQueryResultManager;
class TReaperTimer;
class TServerSocket;
class TShutdownTimer;
class TSocket;
class TVirtualProofPlayer;
// Hook to external function setting up authentication related stuff
// for old versions.
// For backward compatibility
typedef Int_t (*OldProofServAuthSetup_t)(TSocket *, Bool_t, Int_t,
TString &, TString &, TString &);
class TProofServ : public TApplication {
friend class TProofServLite;
friend class TXProofServ;
public:
enum EStatusBits { kHighMemory = BIT(16) };
enum EQueryAction { kQueryOK, kQueryModify, kQueryStop, kQueryEnqueued };
private:
TString fService; //service we are running, either "proofserv" or "proofslave"
TString fUser; //user as which we run
TString fGroup; //group the user belongs to
TString fConfDir; //directory containing cluster config information
TString fConfFile; //file containing config information
TString fWorkDir; //directory containing all proof related info
TString fImage; //image name of the session
TString fSessionTag; //tag for the server session
TString fTopSessionTag; //tag for the global session
TString fSessionDir; //directory containing session dependent files
TString fPackageDir; //directory containing packages and user libs
THashList *fGlobalPackageDirList; //list of directories containing global packages libs
TString fCacheDir; //directory containing cache of user files
TString fQueryDir; //directory containing query results and status
TString fDataSetDir; //directory containing info about known data sets
TString fDataDir; //directory containing data files produced during queries
TString fDataDirOpts; //Url type options for fDataDir
TString fAdminPath; //admin path for this session
TString fOutputFile; //path with the temporary results of the current or last query
TProofLockPath *fPackageLock; //package dir locker
TProofLockPath *fCacheLock; //cache dir locker
TProofLockPath *fQueryLock; //query dir locker
TString fArchivePath; //default archive path
TSocket *fSocket; //socket connection to client
TProof *fProof; //PROOF talking to slave servers
TVirtualProofPlayer *fPlayer; //actual player
FILE *fLogFile; //log file
Int_t fLogFileDes; //log file descriptor
Long64_t fLogFileMaxSize; //max size for log files (enabled if > 0)
TList *fEnabledPackages; //list of enabled packages
Int_t fProtocol; //protocol version number
TString fOrdinal; //slave ordinal number
Int_t fGroupId; //slave unique id in the active slave group
Int_t fGroupSize; //size of the active slave group
Int_t fLogLevel; //debug logging level
Int_t fNcmd; //command history number
Int_t fGroupPriority; //priority of group the user belongs to (0 - 100)
Bool_t fEndMaster; //true for a master in direct contact only with workers
Bool_t fMasterServ; //true if we are a master server
Bool_t fInterrupt; //if true macro execution will be stopped
Float_t fRealTime; //real time spent executing commands
Float_t fCpuTime; //CPU time spent executing commands
TStopwatch fLatency; //measures latency of packet requests
TStopwatch fCompute; //measures time spent processing a packet
TStopwatch fSaveOutput; //measures time spent saving the partial result
Int_t fQuerySeqNum; //sequential number of the current or last query
Int_t fTotSessions; //Total number of PROOF sessions on the cluster
Int_t fActSessions; //Total number of active PROOF sessions on the cluster
Float_t fEffSessions; //Effective Number of PROOF sessions on the assigned machines
TFileHandler *fInputHandler; //Input socket handler
TQueryResultManager *fQMgr; //Query-result manager
TList *fWaitingQueries; //list of TProofQueryResult waiting to be processed
Bool_t fIdle; //TRUE if idle
TMutex *fQMtx; // To protect async msg queue
TList *fQueuedMsg; //list of messages waiting to be processed
TString fPrefix; //Prefix identifying the node
Bool_t fRealTimeLog; //TRUE if log messages should be send back in real-time
TShutdownTimer *fShutdownTimer; // Timer used to shutdown out-of-control sessions
TReaperTimer *fReaperTimer; // Timer used to control children state
TIdleTOTimer *fIdleTOTimer; // Timer used to control children state
Int_t fInflateFactor; // Factor in 1/1000 to inflate the CPU time
Int_t fCompressMsg; // Compression level for messages
TDataSetManager* fDataSetManager; // dataset manager
TDataSetManagerFile *fDataSetStgRepo; // repository for staging requests
Bool_t fSendLogToMaster; // On workers, controls logs sending to master
TServerSocket *fMergingSocket; // Socket used for merging outputs if submerger
TMonitor *fMergingMonitor; // Monitor for merging sockets
Int_t fMergedWorkers; // Number of workers merged
// Quotas (-1 to disable)
Int_t fMaxQueries; //Max number of queries fully kept
Long64_t fMaxBoxSize; //Max size of the sandbox
Long64_t fHWMBoxSize; //High-Water-Mark on the sandbox size
// Memory limits (-1 to disable) set by envs ROOTPROFOASHARD, PROOF_VIRTMEMMAX, PROOF_RESMEMMAX
static Long_t fgVirtMemMax; //Hard limit enforced by the system (in kB)
static Long_t fgResMemMax; //Hard limit on the resident memory checked
//in TProofPlayer::Process (in kB)
static Float_t fgMemHWM; // Threshold fraction of max for warning and finer monitoring
static Float_t fgMemStop; // Fraction of max for stop processing
// In bytes; default is 1MB
Long64_t fMsgSizeHWM; //High-Water-Mark on the size of messages with results
static FILE *fgErrorHandlerFile; // File where to log
static Int_t fgRecursive; // Keep track of recursive inputs during processing
// Control sending information to syslog
static Int_t fgLogToSysLog; // >0 sent to syslog too
static TString fgSysLogService; // name of the syslog service (eg: proofm-0, proofw-0.67)
static TString fgSysLogEntity; // logging entity (<user>:<group>)
Int_t GetCompressionLevel() const;
void RedirectOutput(const char *dir = 0, const char *mode = "w");
Int_t CatMotd();
Int_t UnloadPackage(const char *package);
Int_t UnloadPackages();
Int_t OldAuthSetup(TString &wconf);
Int_t GetPriority();
// Query handlers
TProofQueryResult *MakeQueryResult(Long64_t nentries, const char *opt,
TList *inl, Long64_t first, TDSet *dset,
const char *selec, TObject *elist);
void SetQueryRunning(TProofQueryResult *pq);
// Results handling
Int_t SendResults(TSocket *sock, TList *outlist = 0, TQueryResult *pq = 0);
Bool_t AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer);
// Waiting queries handlers
void SetIdle(Bool_t st = kTRUE);
Bool_t IsWaiting();
Int_t WaitingQueries();
Int_t QueueQuery(TProofQueryResult *pq);
TProofQueryResult *NextQuery();
Int_t CleanupWaitingQueries(Bool_t del = kTRUE, TList *qls = 0);
protected:
virtual void HandleArchive(TMessage *mess, TString *slb = 0);
virtual Int_t HandleCache(TMessage *mess, TString *slb = 0);
virtual void HandleCheckFile(TMessage *mess, TString *slb = 0);
virtual Int_t HandleDataSets(TMessage *mess, TString *slb = 0);
virtual void HandleSubmerger(TMessage *mess);
virtual void HandleFork(TMessage *mess);
virtual void HandleLibIncPath(TMessage *mess);
virtual void HandleProcess(TMessage *mess, TString *slb = 0);
virtual void HandleQueryList(TMessage *mess);
virtual void HandleRemove(TMessage *mess, TString *slb = 0);
virtual void HandleRetrieve(TMessage *mess, TString *slb = 0);
virtual Int_t HandleWorkerLists(TMessage *mess);
virtual void ProcessNext(TString *slb = 0);
virtual Int_t Setup();
Int_t SetupCommon();
virtual void MakePlayer();
virtual void DeletePlayer();
virtual Int_t Fork();
Int_t GetSessionStatus();
Bool_t IsIdle();
Bool_t UnlinkDataDir(const char *path);
static TString fgLastMsg; // Message about status before exception
static Long64_t fgLastEntry; // Last entry before exception
public:
TProofServ(Int_t *argc, char **argv, FILE *flog = 0);
virtual ~TProofServ();
virtual Int_t CreateServer();
TProof *GetProof() const { return fProof; }
const char *GetService() const { return fService; }
const char *GetConfDir() const { return fConfDir; }
const char *GetConfFile() const { return fConfFile; }
const char *GetUser() const { return fUser; }
const char *GetGroup() const { return fGroup; }
const char *GetWorkDir() const { return fWorkDir; }
const char *GetImage() const { return fImage; }
const char *GetSessionTag() const { return fSessionTag; }
const char *GetTopSessionTag() const { return fTopSessionTag; }
const char *GetSessionDir() const { return fSessionDir; }
const char *GetPackageDir() const { return fPackageDir; }
const char *GetDataDir() const { return fDataDir; }
const char *GetDataDirOpts() const { return fDataDirOpts; }
Int_t GetProtocol() const { return fProtocol; }
const char *GetOrdinal() const { return fOrdinal; }
Int_t GetGroupId() const { return fGroupId; }
Int_t GetGroupSize() const { return fGroupSize; }
Int_t GetLogLevel() const { return fLogLevel; }
TSocket *GetSocket() const { return fSocket; }
Float_t GetRealTime() const { return fRealTime; }
Float_t GetCpuTime() const { return fCpuTime; }
Int_t GetQuerySeqNum() const { return fQuerySeqNum; }
Int_t GetTotSessions() const { return fTotSessions; }
Int_t GetActSessions() const { return fActSessions; }
Float_t GetEffSessions() const { return fEffSessions; }
void GetOptions(Int_t *argc, char **argv);
TList *GetEnabledPackages() const { return fEnabledPackages; }
Int_t GetInflateFactor() const { return fInflateFactor; }
static Long_t GetVirtMemMax();
static Long_t GetResMemMax();
static Float_t GetMemHWM();
static Float_t GetMemStop();
Long64_t GetMsgSizeHWM() const { return fMsgSizeHWM; }
const char *GetPrefix() const { return fPrefix; }
void FlushLogFile();
void TruncateLogFile(); // Called also by TDSetProxy::Next()
TProofLockPath *GetCacheLock() { return fCacheLock; } //cache dir locker; used by TProofPlayer
Int_t CopyFromCache(const char *name, Bool_t cpbin);
Int_t CopyToCache(const char *name, Int_t opt = 0);
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange,
Bool_t resume = kFALSE);
virtual void HandleException(Int_t sig);
virtual Int_t HandleSocketInput(TMessage *mess, Bool_t all);
virtual void HandleSocketInput();
virtual void HandleUrgentData();
virtual void HandleSigPipe();
virtual void HandleTermination() { Terminate(0); }
void Interrupt() { fInterrupt = kTRUE; }
Bool_t IsEndMaster() const { return fEndMaster; }
Bool_t IsMaster() const { return fMasterServ; }
Bool_t IsParallel() const;
Bool_t IsTopMaster() const { return fOrdinal == "0"; }
void Run(Bool_t retrn = kFALSE);
void Print(Option_t *option="") const;
void RestartComputeTime();
TObject *Get(const char *namecycle);
TDSetElement *GetNextPacket(Long64_t totalEntries = -1);
virtual void ReleaseWorker(const char *) { }
void Reset(const char *dir);
Int_t ReceiveFile(const char *file, Bool_t bin, Long64_t size);
virtual Int_t SendAsynMessage(const char *msg, Bool_t lf = kTRUE);
virtual void SendLogFile(Int_t status = 0, Int_t start = -1, Int_t end = -1);
void SendStatistics();
void SendParallel(Bool_t async = kFALSE);
Int_t UpdateSessionStatus(Int_t xst = -1);
// Disable / Enable read timeout
virtual void DisableTimeout() { }
virtual void EnableTimeout() { }
virtual void Terminate(Int_t status);
// Log control
void LogToMaster(Bool_t on = kTRUE) { fSendLogToMaster = on; }
static FILE *SetErrorHandlerFile(FILE *ferr);
static void ErrorHandler(Int_t level, Bool_t abort, const char *location,
const char *msg);
static void ResolveKeywords(TString &fname, const char *path = 0);
static void SetLastMsg(const char *lastmsg);
static void SetLastEntry(Long64_t lastentry);
// To handle local data server related paths
static void FilterLocalroot(TString &path, const char *url = "root://dum/");
static void GetLocalServer(TString &dsrv);
// To prepara ethe map of files to process
static TMap *GetDataSetNodeMap(TFileCollection *fc, TString &emsg);
static Int_t RegisterDataSets(TList *in, TList *out, TDataSetManager *dsm, TString &e);
static Bool_t IsActive();
static TProofServ *This();
ClassDef(TProofServ,0) //PROOF Server Application Interface
};
R__EXTERN TProofServ *gProofServ;
class TProofLockPath : public TNamed {
private:
Int_t fLockId; //file id of dir lock
public:
TProofLockPath(const char *path) : TNamed(path,path), fLockId(-1) { }
~TProofLockPath() { if (IsLocked()) Unlock(); }
Int_t Lock();
Int_t Unlock();
Bool_t IsLocked() const { return (fLockId > -1); }
};
class TProofLockPathGuard {
private:
TProofLockPath *fLocker; //locker instance
public:
TProofLockPathGuard(TProofLockPath *l) { fLocker = l; if (fLocker) fLocker->Lock(); }
~TProofLockPathGuard() { if (fLocker) fLocker->Unlock(); }
};
//----- Handles output from commands executed externally via a pipe. ---------//
//----- The output is redirected one level up (i.e., to master or client). ---//
//______________________________________________________________________________
class TProofServLogHandler : public TFileHandler {
private:
TSocket *fSocket; // Socket where to redirect the message
FILE *fFile; // File connected with the open pipe
TString fPfx; // Prefix to be prepended to messages
static TString fgPfx; // Default prefix to be prepended to messages
static Int_t fgCmdRtn; // Return code of the command execution (available only
// after closing the pipe)
public:
enum EStatusBits { kFileIsPipe = BIT(23) };
TProofServLogHandler(const char *cmd, TSocket *s, const char *pfx = "");
TProofServLogHandler(FILE *f, TSocket *s, const char *pfx = "");
virtual ~TProofServLogHandler();
Bool_t IsValid() { return ((fFile && fSocket) ? kTRUE : kFALSE); }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
static void SetDefaultPrefix(const char *pfx);
static Int_t GetCmdRtn();
};
//--- Guard class: close pipe, deactivatethe related descriptor --------------//
//______________________________________________________________________________
class TProofServLogHandlerGuard {
private:
TProofServLogHandler *fExecHandler;
public:
TProofServLogHandlerGuard(const char *cmd, TSocket *s,
const char *pfx = "", Bool_t on = kTRUE);
TProofServLogHandlerGuard(FILE *f, TSocket *s,
const char *pfx = "", Bool_t on = kTRUE);
virtual ~TProofServLogHandlerGuard();
};
//--- Special timer to control delayed shutdowns
//______________________________________________________________________________
class TShutdownTimer : public TTimer {
private:
TProofServ *fProofServ;
Int_t fTimeout;
public:
TShutdownTimer(TProofServ *p, Int_t delay);
Bool_t Notify();
};
//--- Synchronous timer used to reap children processes change of state
//______________________________________________________________________________
class TReaperTimer : public TTimer {
private:
TList *fChildren; // List of children (forked) processes
public:
TReaperTimer(Long_t frequency = 1000) : TTimer(frequency, kTRUE), fChildren(0) { }
virtual ~TReaperTimer();
void AddPid(Int_t pid);
Bool_t Notify();
};
//--- Special timer to terminate idle sessions
//______________________________________________________________________________
class TIdleTOTimer : public TTimer {
private:
TProofServ *fProofServ;
public:
TIdleTOTimer(TProofServ *p, Int_t delay) : TTimer(delay, kTRUE), fProofServ(p) { }
Bool_t Notify();
};
//______________________________________________________________________________
class TIdleTOTimerGuard {
private:
TIdleTOTimer *fIdleTOTimer;
public:
TIdleTOTimerGuard(TIdleTOTimer *t) : fIdleTOTimer(t) { if (fIdleTOTimer) fIdleTOTimer->Stop(); }
virtual ~TIdleTOTimerGuard() { if (fIdleTOTimer) fIdleTOTimer->Start(-1, kTRUE); }
};
//______________________________________________________________________________
inline Int_t TProofServ::GetCompressionLevel() const
{
return (fCompressMsg < 0) ? -1 : fCompressMsg % 100;
}
#endif
|