/usr/include/root/TPacketizerAdaptive.h is in libroot-proof-proofplayer-dev 5.34.30-0ubuntu8.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 | // @(#)root/proofplayer:$Id$
// Author: Jan Iwaszkiewicz 11/12/06
/*************************************************************************
* Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
* All rights reserved. *
* *
* For the licensing terms see $ROOTSYS/LICENSE. *
* For the list of contributors see $ROOTSYS/README/CREDITS. *
*************************************************************************/
#ifndef ROOT_TPacketizerAdaptive
#define ROOT_TPacketizerAdaptive
//////////////////////////////////////////////////////////////////////////
// //
// TPacketizerAdaptive //
// //
// This packetizer is based on TPacketizer but uses different //
// load-balancing algorithms and data structures. //
// Two main improvements in the load-balancing strategy: //
// - First one was to change the order in which the files are assigned //
// to the computing nodes in such a way that network transfers are //
// evenly distributed in the query time. Transfer of the remote files //
// was often becoming a bottleneck at the end of a query. //
// - The other improvement is the use of time-based packet size. We //
// measure the processing rate of all the nodes and calculate the //
// packet size, so that it takes certain amount of time. In this way //
// packetizer prevents the situation where the query can't finish //
// because of one slow node. //
// //
// The data structures: TFileStat, TFileNode and TSlaveStat are //
// enriched + changed and TFileNode::Compare method is changed. //
// //
//////////////////////////////////////////////////////////////////////////
#ifndef ROOT_TVirtualPacketizer
#include "TVirtualPacketizer.h"
#endif
class TMessage;
class TTree;
class TMap;
class TNtupleD;
class TProofStats;
class TRandom;
class TSortedList;
class TPacketizerAdaptive : public TVirtualPacketizer {
public: // public because of Sun CC bug
class TFileNode;
class TFileStat;
class TSlaveStat;
private:
TList *fFileNodes; // nodes with files
TList *fUnAllocated; // nodes with unallocated files
TList *fActive; // nodes with unfinished files
Int_t fMaxPerfIdx; // maximum of our slaves' performance index
TList *fPartitions; // list of partitions on nodes
TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed
Bool_t fCachePacketSync; // control synchronization of cache and packet sizes
Double_t fMaxEntriesRatio; // max file entries to avg allowed ratio for cache-to-packet sync
Float_t fFractionOfRemoteFiles; // fraction of TDSetElements that are on non-workers
Long64_t fNEventsOnRemLoc; // number of events in currently
// unalloc files on non-worker loc.
Float_t fBaseLocalPreference; // indicates how much more likely the nodes will be
// to open their local files (1 means indifferent)
Bool_t fForceLocal; // if 1 - eliminate the remote processing
Long_t fMaxSlaveCnt; // maximum number of workers per filenode (Long_t to avoid
// warnings from backward compatibility support)
Int_t fPacketAsAFraction; // used to calculate the packet size
// fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves)
// fPacketAsAFraction can be interpreted as follows:
// assuming all slaves have equal processing rate, packet size
// is (#events processed by 1 slave) / fPacketSizeAsAFraction.
// It can be set with PROOF_PacketAsAFraction in input list.
Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
Int_t fTryReassign; // Controls attempts to reassign packets (0 == no reassignment)
TPacketizerAdaptive();
TPacketizerAdaptive(const TPacketizerAdaptive&); // no implementation, will generate
void InitStats(); // initialise the stats
void operator=(const TPacketizerAdaptive&); // error on accidental usage
TFileNode *NextNode();
void RemoveUnAllocNode(TFileNode *);
TFileNode *NextActiveNode();
void RemoveActiveNode(TFileNode *);
TFileStat *GetNextUnAlloc(TFileNode *node = 0, const char *nodeHostName = 0);
TFileStat *GetNextActive();
void RemoveActive(TFileStat *file);
void Reset();
void ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent = -1, Bool_t byfile = kFALSE);
Int_t ReassignPacket(TDSetElement *e, TList **listOfMissingFiles);
void SplitPerHost(TList *elements, TList **listOfMissingFiles);
public:
TPacketizerAdaptive(TDSet *dset, TList *slaves, Long64_t first, Long64_t num,
TList *input, TProofProgressStatus *st);
virtual ~TPacketizerAdaptive();
Int_t AddProcessed(TSlave *sl, TProofProgressStatus *st,
Double_t latency, TList **listOfMissingFiles = 0);
Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls);
Float_t GetCurrentRate(Bool_t &all);
Int_t CalculatePacketSize(TObject *slstat, Long64_t cachesz, Int_t learnent);
TDSetElement *GetNextPacket(TSlave *sl, TMessage *r);
void MarkBad(TSlave *s, TProofProgressStatus *status, TList **missingFiles);
Int_t GetActiveWorkers();
ClassDef(TPacketizerAdaptive,0) //Generate work packets for parallel processing
};
#endif
|