/usr/include/root/TVirtualPacketizer.h is in libroot-proof-proofplayer-dev 5.34.30-0ubuntu8.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | // @(#)root/proofplayer:$Id$
// Author: Maarten Ballintijn 9/7/2002
/*************************************************************************
* Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
#ifndef ROOT_TVirtualPacketizer
#define ROOT_TVirtualPacketizer
//////////////////////////////////////////////////////////////////////////
// //
// TVirtualPacketizer //
// //
// Packetizer is a load balancing object created for each query. //
// It generates packets to be processed on PROOF worker servers. //
// A packet is an event range (begin entry and number of entries) or //
// object range (first object and number of objects) in a TTree //
// (entries) or a directory (objects) in a file. //
// Packets are generated taking into account the performance of the //
// remote machine, the time it took to process a previous packet on //
// the remote machine, the locality of the database files, etc. //
// //
// TVirtualPacketizer includes common parts of PROOF packetizers. //
// Look in subclasses for details. //
// The default packetizer is TPacketizerAdaptive. //
// To use an alternative one, for instance - the TPacketizer, call: //
// proof->SetParameter("PROOF_Packetizer", "TPacketizer"); //
// //
//////////////////////////////////////////////////////////////////////////
#ifndef ROOT_TObject
#include "TObject.h"
#endif
#ifndef ROOT_TSlave
#include "TSlave.h"
#endif
#ifndef ROOT_TProofProgressStatus
#include "TProofProgressStatus.h"
#endif
#ifndef ROOT_TTime
#include "TTime.h"
#endif
class TDSet;
class TDSetElement;
class TList;
class TMap;
class TMessage;
class TNtuple;
class TNtupleD;
class TProofProgressInfo;
class TSlave;
class TVirtualPacketizer : public TObject {
public: // public because of Sun CC bug
class TVirtualSlaveStat;
protected:
enum EUseEstOpt { // Option for usage of estimated values
kEstOff = 0,
kEstCurrent = 1,
kEstAverage = 2
};
// General configuration parameters
Double_t fMinPacketTime; // minimum packet time
Double_t fMaxPacketTime; // maximum packet time
TList *fConfigParams; // List of configuration parameters
TMap *fSlaveStats; // slave status, keyed by correspondig TSlave
TProofProgressStatus *fProgressStatus; // pointer to status in the player.
TTimer *fProgress; // progress updates timer
Long64_t fTotalEntries; // total number of entries to be distributed;
// not used in the progressive packetizer
TList *fFailedPackets;// a list of packets that failed while processing
// Members for progress info
TTime fStartTime; // time offset
Float_t fInitTime; // time before processing
Float_t fProcTime; // time since start of processing
Float_t fTimeUpdt; // time between updates
TNtupleD *fCircProg; // Keeps circular info for "instantenous"
// rate calculations
Long_t fCircN; // Circularity
TNtuple *fProgressPerf; // {Active workers, evt rate, MBs read} as a function of processing time
Float_t fProcTimeLast; // Time of the last measurement
Int_t fActWrksLast; // Active workers at fProcTimeLast
Float_t fEvtRateLast; // Evt rate at fProcTimeLast
Float_t fMBsReadLast; // MBs read at fProcTimeLast
Float_t fEffSessLast; // Number of effective sessions at fProcTimeLast
Bool_t fAWLastFill; // Whether to fill the last measurement
Float_t fReportPeriod; // Time between reports if nothing changes (estimated proc time / 100)
EUseEstOpt fUseEstOpt; // Control usage of estimated values for the progress info
Bool_t fValid; // Constructed properly?
Bool_t fStop; // Termination of Process() requested?
TString fDataSet; // Name of the dataset being processed (for dataset-driven runs)
TList *fInput; // Input list
TVirtualPacketizer(TList *input, TProofProgressStatus *st = 0);
TVirtualPacketizer(const TVirtualPacketizer &); // no implementation, will generate
void operator=(const TVirtualPacketizer &); // error on accidental usage
TDSetElement *CreateNewPacket(TDSetElement* base, Long64_t first, Long64_t num);
Long64_t GetEntries(Bool_t tree, TDSetElement *e); // Num of entries or objects
virtual Bool_t HandleTimer(TTimer *timer);
public:
enum EStatusBits { kIsInitializing = BIT(16), kIsDone = BIT(17), kIsTree = BIT(18) };
virtual ~TVirtualPacketizer();
Bool_t IsValid() const { return fValid; }
Long64_t GetEntriesProcessed() const { return (fProgressStatus? fProgressStatus->GetEntries() : 0); }
virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
{ ent = GetEntriesProcessed(); bytes = GetBytesRead(); calls = GetReadCalls(); return 0; }
virtual Float_t GetCurrentRate(Bool_t &all) { all = kTRUE; return (fProgressStatus? fProgressStatus->GetCurrentRate() : 0.); }
Long64_t GetTotalEntries() const { return fTotalEntries; }
virtual TDSetElement *GetNextPacket(TSlave *sl, TMessage *r);
virtual void SetInitTime();
virtual void StopProcess(Bool_t abort, Bool_t stoptimer = kFALSE);
TList *GetFailedPackets() { return fFailedPackets; }
void SetFailedPackets(TList *list) { fFailedPackets = list; }
virtual Int_t AddWorkers(TList *workers);
Long64_t GetBytesRead() const { return (fProgressStatus? fProgressStatus->GetBytesRead() : 0); }
Long64_t GetReadCalls() const { return (fProgressStatus? fProgressStatus->GetReadCalls() : 0); }
Double_t GetCumProcTime() const { return fProgressStatus->GetProcTime(); }
Float_t GetInitTime() const { return fInitTime; }
Float_t GetProcTime() const { return fProcTime; }
TNtuple *GetProgressPerf(Bool_t steal = kFALSE) { if (steal) { TNtuple *n = fProgressPerf; fProgressPerf = 0; return n;
} else { return fProgressPerf;} }
TList *GetConfigParams(Bool_t steal = kFALSE) { if (steal) { TList *l = fConfigParams; fConfigParams = 0; return l;
} else { return fConfigParams;} }
virtual void MarkBad(TSlave * /*s*/, TProofProgressStatus * /*status*/, TList ** /*missingFiles*/) { return; }
virtual Int_t AddProcessed(TSlave * /*sl*/, TProofProgressStatus * /*st*/,
Double_t /*lat*/, TList ** /*missingFiles*/) { return 0; }
TProofProgressStatus *GetStatus() { return fProgressStatus; }
void SetProgressStatus(TProofProgressStatus *st) { fProgressStatus = st; }
void SetTotalEntries(Long64_t ent) { fTotalEntries = ent; }
TMap *GetSlaveStats() const { return fSlaveStats; }
virtual Int_t GetActiveWorkers() { return -1; }
ClassDef(TVirtualPacketizer,0) //Generate work packets for parallel processing
};
//------------------------------------------------------------------------------
class TVirtualPacketizer::TVirtualSlaveStat : public TObject {
friend class TPacketizerAdaptive;
friend class TPacketizer;
protected:
TString fWrkFQDN; // Worker FQDN
TSlave *fSlave; // corresponding TSlave record
TProofProgressStatus *fStatus; // status as of the last finished packet
public:
const char *GetName() const { return fWrkFQDN.Data(); }
const char *GetOrdinal() const { return fSlave->GetOrdinal(); }
Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
Float_t GetAvgRate() { return fStatus->GetRate(); }
TProofProgressStatus *GetProgressStatus() { return fStatus; }
virtual TProofProgressStatus *AddProcessed(TProofProgressStatus *st) = 0;
};
#endif
|