/usr/include/root/TProofServ.h is in libroot-proof-dev 5.34.14-1build1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
| // @(#)root/proof:$Id$
// Author: Fons Rademakers 16/02/97
/*************************************************************************
* Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
#ifndef ROOT_TProofServ
#define ROOT_TProofServ
//////////////////////////////////////////////////////////////////////////
// //
// TProofServ //
// //
// TProofServ is the PROOF server. It can act either as the master //
// server or as a slave server, depending on its startup arguments. It //
// receives and handles message coming from the client or from the //
// master server. //
// //
//////////////////////////////////////////////////////////////////////////
#ifndef ROOT_TApplication
#include "TApplication.h"
#endif
#ifndef ROOT_TString
#include "TString.h"
#endif
#ifndef ROOT_TSysEvtHandler
#include "TSysEvtHandler.h"
#endif
#ifndef ROOT_TStopwatch
#include "TStopwatch.h"
#endif
#ifndef ROOT_TTimer
#include "TTimer.h"
#endif
#ifndef ROOT_TProofQueryResult
#include "TProofQueryResult.h"
#endif
class TDataSetManager;
class TDataSetManagerFile;
class TDSet;
class TDSetElement;
class TFileCollection;
class TFileHandler;
class THashList;
class TIdleTOTimer;
class TList;
class TMap;
class TMessage;
class TMonitor;
class TMutex;
class TProof;
class TProofLockPath;
class TQueryResultManager;
class TReaperTimer;
class TServerSocket;
class TShutdownTimer;
class TSocket;
class TVirtualProofPlayer;
// Hook to external function setting up authentication related stuff
// for old versions.
// For backward compatibility
typedef Int_t (*OldProofServAuthSetup_t)(TSocket *, Bool_t, Int_t,
TString &, TString &, TString &);
class TProofServ : public TApplication {
friend class TProofServLite;
friend class TXProofServ;
public:
enum EStatusBits { kHighMemory = BIT(16) };
enum EQueryAction { kQueryOK, kQueryModify, kQueryStop, kQueryEnqueued };
private:
TString fService; //service we are running, either "proofserv" or "proofslave"
TString fUser; //user as which we run
TString fGroup; //group the user belongs to
TString fConfDir; //directory containing cluster config information
TString fConfFile; //file containing config information
TString fWorkDir; //directory containing all proof related info
TString fImage; //image name of the session
TString fSessionTag; //tag for the server session
TString fTopSessionTag; //tag for the global session
TString fSessionDir; //directory containing session dependent files
TString fPackageDir; //directory containing packages and user libs
THashList *fGlobalPackageDirList; //list of directories containing global packages libs
TString fCacheDir; //directory containing cache of user files
TString fQueryDir; //directory containing query results and status
TString fDataSetDir; //directory containing info about known data sets
TString fDataDir; //directory containing data files produced during queries
TString fDataDirOpts; //Url type options for fDataDir
TString fAdminPath; //admin path for this session
TString fOutputFile; //path with the temporary results of the current or last query
TProofLockPath *fPackageLock; //package dir locker
TProofLockPath *fCacheLock; //cache dir locker
TProofLockPath *fQueryLock; //query dir locker
TString fArchivePath; //default archive path
TSocket *fSocket; //socket connection to client
TProof *fProof; //PROOF talking to slave servers
TVirtualProofPlayer *fPlayer; //actual player
FILE *fLogFile; //log file
Int_t fLogFileDes; //log file descriptor
Long64_t fLogFileMaxSize; //max size for log files (enabled if > 0)
TList *fEnabledPackages; //list of enabled packages
Int_t fProtocol; //protocol version number
TString fOrdinal; //slave ordinal number
Int_t fGroupId; //slave unique id in the active slave group
Int_t fGroupSize; //size of the active slave group
Int_t fLogLevel; //debug logging level
Int_t fNcmd; //command history number
Int_t fGroupPriority; //priority of group the user belongs to (0 - 100)
Bool_t fEndMaster; //true for a master in direct contact only with workers
Bool_t fMasterServ; //true if we are a master server
Bool_t fInterrupt; //if true macro execution will be stopped
Float_t fRealTime; //real time spent executing commands
Float_t fCpuTime; //CPU time spent executing commands
TStopwatch fLatency; //measures latency of packet requests
TStopwatch fCompute; //measures time spent processing a packet
TStopwatch fSaveOutput; //measures time spent saving the partial result
Int_t fQuerySeqNum; //sequential number of the current or last query
Int_t fTotSessions; //Total number of PROOF sessions on the cluster
Int_t fActSessions; //Total number of active PROOF sessions on the cluster
Float_t fEffSessions; //Effective Number of PROOF sessions on the assigned machines
TFileHandler *fInputHandler; //Input socket handler
TQueryResultManager *fQMgr; //Query-result manager
TList *fWaitingQueries; //list of TProofQueryResult waiting to be processed
Bool_t fIdle; //TRUE if idle
TMutex *fQMtx; // To protect async msg queue
TList *fQueuedMsg; //list of messages waiting to be processed
TString fPrefix; //Prefix identifying the node
Bool_t fRealTimeLog; //TRUE if log messages should be send back in real-time
TShutdownTimer *fShutdownTimer; // Timer used to shutdown out-of-control sessions
TReaperTimer *fReaperTimer; // Timer used to control children state
TIdleTOTimer *fIdleTOTimer; // Timer used to control children state
Int_t fInflateFactor; // Factor in 1/1000 to inflate the CPU time
Int_t fCompressMsg; // Compression level for messages
TDataSetManager* fDataSetManager; // dataset manager
TDataSetManagerFile *fDataSetStgRepo; // repository for staging requests
Bool_t fSendLogToMaster; // On workers, controls logs sending to master
TServerSocket *fMergingSocket; // Socket used for merging outputs if submerger
TMonitor *fMergingMonitor; // Monitor for merging sockets
Int_t fMergedWorkers; // Number of workers merged
// Quotas (-1 to disable)
Int_t fMaxQueries; //Max number of queries fully kept
Long64_t fMaxBoxSize; //Max size of the sandbox
Long64_t fHWMBoxSize; //High-Water-Mark on the sandbox size
// Memory limits (-1 to disable) set by envs ROOTPROFOASHARD, PROOF_VIRTMEMMAX, PROOF_RESMEMMAX
static Long_t fgVirtMemMax; //Hard limit enforced by the system (in kB)
static Long_t fgResMemMax; //Hard limit on the resident memory checked
//in TProofPlayer::Process (in kB)
static Float_t fgMemHWM; // Threshold fraction of max for warning and finer monitoring
static Float_t fgMemStop; // Fraction of max for stop processing
// In bytes; default is 1MB
Long64_t fMsgSizeHWM; //High-Water-Mark on the size of messages with results
static FILE *fgErrorHandlerFile; // File where to log
static Int_t fgRecursive; // Keep track of recursive inputs during processing
// Control sending information to syslog
static Int_t fgLogToSysLog; // >0 sent to syslog too
static TString fgSysLogService; // name of the syslog service (eg: proofm-0, proofw-0.67)
static TString fgSysLogEntity; // logging entity (<user>:<group>)
Int_t GetCompressionLevel() const;
void RedirectOutput(const char *dir = 0, const char *mode = "w");
Int_t CatMotd();
Int_t UnloadPackage(const char *package);
Int_t UnloadPackages();
Int_t OldAuthSetup(TString &wconf);
Int_t GetPriority();
// Query handlers
TProofQueryResult *MakeQueryResult(Long64_t nentries, const char *opt,
TList *inl, Long64_t first, TDSet *dset,
const char *selec, TObject *elist);
void SetQueryRunning(TProofQueryResult *pq);
// Results handling
Int_t SendResults(TSocket *sock, TList *outlist = 0, TQueryResult *pq = 0);
Bool_t AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer);
// Waiting queries handlers
void SetIdle(Bool_t st = kTRUE);
Bool_t IsWaiting();
Int_t WaitingQueries();
Int_t QueueQuery(TProofQueryResult *pq);
TProofQueryResult *NextQuery();
Int_t CleanupWaitingQueries(Bool_t del = kTRUE, TList *qls = 0);
protected:
virtual void HandleArchive(TMessage *mess, TString *slb = 0);
virtual Int_t HandleCache(TMessage *mess, TString *slb = 0);
virtual void HandleCheckFile(TMessage *mess, TString *slb = 0);
virtual Int_t HandleDataSets(TMessage *mess, TString *slb = 0);
virtual void HandleSubmerger(TMessage *mess);
virtual void HandleFork(TMessage *mess);
virtual void HandleLibIncPath(TMessage *mess);
virtual void HandleProcess(TMessage *mess, TString *slb = 0);
virtual void HandleQueryList(TMessage *mess);
virtual void HandleRemove(TMessage *mess, TString *slb = 0);
virtual void HandleRetrieve(TMessage *mess, TString *slb = 0);
virtual Int_t HandleWorkerLists(TMessage *mess);
virtual void ProcessNext(TString *slb = 0);
virtual Int_t Setup();
Int_t SetupCommon();
virtual void MakePlayer();
virtual void DeletePlayer();
virtual Int_t Fork();
Int_t GetSessionStatus();
Bool_t IsIdle();
Bool_t UnlinkDataDir(const char *path);
static TString fgLastMsg; // Message about status before exception
static Long64_t fgLastEntry; // Last entry before exception
public:
TProofServ(Int_t *argc, char **argv, FILE *flog = 0);
virtual ~TProofServ();
virtual Int_t CreateServer();
TProof *GetProof() const { return fProof; }
const char *GetService() const { return fService; }
const char *GetConfDir() const { return fConfDir; }
const char *GetConfFile() const { return fConfFile; }
const char *GetUser() const { return fUser; }
const char *GetGroup() const { return fGroup; }
const char *GetWorkDir() const { return fWorkDir; }
const char *GetImage() const { return fImage; }
const char *GetSessionTag() const { return fSessionTag; }
const char *GetTopSessionTag() const { return fTopSessionTag; }
const char *GetSessionDir() const { return fSessionDir; }
const char *GetPackageDir() const { return fPackageDir; }
const char *GetDataDir() const { return fDataDir; }
const char *GetDataDirOpts() const { return fDataDirOpts; }
Int_t GetProtocol() const { return fProtocol; }
const char *GetOrdinal() const { return fOrdinal; }
Int_t GetGroupId() const { return fGroupId; }
Int_t GetGroupSize() const { return fGroupSize; }
Int_t GetLogLevel() const { return fLogLevel; }
TSocket *GetSocket() const { return fSocket; }
Float_t GetRealTime() const { return fRealTime; }
Float_t GetCpuTime() const { return fCpuTime; }
Int_t GetQuerySeqNum() const { return fQuerySeqNum; }
Int_t GetTotSessions() const { return fTotSessions; }
Int_t GetActSessions() const { return fActSessions; }
Float_t GetEffSessions() const { return fEffSessions; }
void GetOptions(Int_t *argc, char **argv);
TList *GetEnabledPackages() const { return fEnabledPackages; }
Int_t GetInflateFactor() const { return fInflateFactor; }
static Long_t GetVirtMemMax();
static Long_t GetResMemMax();
static Float_t GetMemHWM();
static Float_t GetMemStop();
Long64_t GetMsgSizeHWM() const { return fMsgSizeHWM; }
const char *GetPrefix() const { return fPrefix; }
void FlushLogFile();
void TruncateLogFile(); // Called also by TDSetProxy::Next()
TProofLockPath *GetCacheLock() { return fCacheLock; } //cache dir locker; used by TProofPlayer
Int_t CopyFromCache(const char *name, Bool_t cpbin);
Int_t CopyToCache(const char *name, Int_t opt = 0);
virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange,
Bool_t resume = kFALSE);
virtual void HandleException(Int_t sig);
virtual Int_t HandleSocketInput(TMessage *mess, Bool_t all);
virtual void HandleSocketInput();
virtual void HandleUrgentData();
virtual void HandleSigPipe();
virtual void HandleTermination() { Terminate(0); }
void Interrupt() { fInterrupt = kTRUE; }
Bool_t IsEndMaster() const { return fEndMaster; }
Bool_t IsMaster() const { return fMasterServ; }
Bool_t IsParallel() const;
Bool_t IsTopMaster() const { return fOrdinal == "0"; }
void Run(Bool_t retrn = kFALSE);
void Print(Option_t *option="") const;
void RestartComputeTime();
TObject *Get(const char *namecycle);
TDSetElement *GetNextPacket(Long64_t totalEntries = -1);
virtual void ReleaseWorker(const char *) { }
void Reset(const char *dir);
Int_t ReceiveFile(const char *file, Bool_t bin, Long64_t size);
virtual Int_t SendAsynMessage(const char *msg, Bool_t lf = kTRUE);
virtual void SendLogFile(Int_t status = 0, Int_t start = -1, Int_t end = -1);
void SendStatistics();
void SendParallel(Bool_t async = kFALSE);
Int_t UpdateSessionStatus(Int_t xst = -1);
// Disable / Enable read timeout
virtual void DisableTimeout() { }
virtual void EnableTimeout() { }
virtual void Terminate(Int_t status);
// Log control
void LogToMaster(Bool_t on = kTRUE) { fSendLogToMaster = on; }
static FILE *SetErrorHandlerFile(FILE *ferr);
static void ErrorHandler(Int_t level, Bool_t abort, const char *location,
const char *msg);
static void ResolveKeywords(TString &fname, const char *path = 0);
static void SetLastMsg(const char *lastmsg);
static void SetLastEntry(Long64_t lastentry);
// To handle local data server related paths
static void FilterLocalroot(TString &path, const char *url = "root://dum/");
static void GetLocalServer(TString &dsrv);
// To prepara ethe map of files to process
static TMap *GetDataSetNodeMap(TFileCollection *fc, TString &emsg);
static Int_t RegisterDataSets(TList *in, TList *out, TDataSetManager *dsm, TString &e);
static Bool_t IsActive();
static TProofServ *This();
ClassDef(TProofServ,0) //PROOF Server Application Interface
};
R__EXTERN TProofServ *gProofServ;
class TProofLockPath : public TNamed {
private:
Int_t fLockId; //file id of dir lock
public:
TProofLockPath(const char *path) : TNamed(path,path), fLockId(-1) { }
~TProofLockPath() { if (IsLocked()) Unlock(); }
Int_t Lock();
Int_t Unlock();
Bool_t IsLocked() const { return (fLockId > -1); }
};
class TProofLockPathGuard {
private:
TProofLockPath *fLocker; //locker instance
public:
TProofLockPathGuard(TProofLockPath *l) { fLocker = l; if (fLocker) fLocker->Lock(); }
~TProofLockPathGuard() { if (fLocker) fLocker->Unlock(); }
};
//----- Handles output from commands executed externally via a pipe. ---------//
//----- The output is redirected one level up (i.e., to master or client). ---//
//______________________________________________________________________________
class TProofServLogHandler : public TFileHandler {
private:
TSocket *fSocket; // Socket where to redirect the message
FILE *fFile; // File connected with the open pipe
TString fPfx; // Prefix to be prepended to messages
static TString fgPfx; // Default prefix to be prepended to messages
static Int_t fgCmdRtn; // Return code of the command execution (available only
// after closing the pipe)
public:
enum EStatusBits { kFileIsPipe = BIT(23) };
TProofServLogHandler(const char *cmd, TSocket *s, const char *pfx = "");
TProofServLogHandler(FILE *f, TSocket *s, const char *pfx = "");
virtual ~TProofServLogHandler();
Bool_t IsValid() { return ((fFile && fSocket) ? kTRUE : kFALSE); }
Bool_t Notify();
Bool_t ReadNotify() { return Notify(); }
static void SetDefaultPrefix(const char *pfx);
static Int_t GetCmdRtn();
};
//--- Guard class: close pipe, deactivatethe related descriptor --------------//
//______________________________________________________________________________
class TProofServLogHandlerGuard {
private:
TProofServLogHandler *fExecHandler;
public:
TProofServLogHandlerGuard(const char *cmd, TSocket *s,
const char *pfx = "", Bool_t on = kTRUE);
TProofServLogHandlerGuard(FILE *f, TSocket *s,
const char *pfx = "", Bool_t on = kTRUE);
virtual ~TProofServLogHandlerGuard();
};
//--- Special timer to control delayed shutdowns
//______________________________________________________________________________
class TShutdownTimer : public TTimer {
private:
TProofServ *fProofServ;
Int_t fTimeout;
public:
TShutdownTimer(TProofServ *p, Int_t delay);
Bool_t Notify();
};
//--- Synchronous timer used to reap children processes change of state
//______________________________________________________________________________
class TReaperTimer : public TTimer {
private:
TList *fChildren; // List of children (forked) processes
public:
TReaperTimer(Long_t frequency = 1000) : TTimer(frequency, kTRUE), fChildren(0) { }
virtual ~TReaperTimer();
void AddPid(Int_t pid);
Bool_t Notify();
};
//--- Special timer to terminate idle sessions
//______________________________________________________________________________
class TIdleTOTimer : public TTimer {
private:
TProofServ *fProofServ;
public:
TIdleTOTimer(TProofServ *p, Int_t delay) : TTimer(delay, kTRUE), fProofServ(p) { }
Bool_t Notify();
};
//______________________________________________________________________________
class TIdleTOTimerGuard {
private:
TIdleTOTimer *fIdleTOTimer;
public:
TIdleTOTimerGuard(TIdleTOTimer *t) : fIdleTOTimer(t) { if (fIdleTOTimer) fIdleTOTimer->Stop(); }
virtual ~TIdleTOTimerGuard() { if (fIdleTOTimer) fIdleTOTimer->Start(-1, kTRUE); }
};
//______________________________________________________________________________
inline Int_t TProofServ::GetCompressionLevel() const
{
return (fCompressMsg < 0) ? -1 : fCompressMsg % 100;
}
#endif
|