/usr/lib/python2.7/dist-packages/paraview/coprocessing.py is in paraview-python 4.0.1-1ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 | r"""
This module is designed for use in co-processing Python scripts. It provides a
class, Pipeline, which is designed to be used as the base-class for Python
pipeline. Additionally, this module has several other utility functions that are
approriate for co-processing.
"""
from paraview import simple
from paraview.vtk import vtkPVVTKExtensionsCore
import math
# -----------------------------------------------------------------------------
def IsInModulo(timestep, frequencyArray):
"""
Return True if the given timestep is in one of the provided frequency.
This can be interpreted as follow::
isFM = IsInModulo(timestep, [2,3,7])
is similar to::
isFM = (timestep % 2 == 0) or (timestep % 3 == 0) or (timestep % 7 == 0)
"""
for freqency in frequencyArray:
if (timestep % freqency == 0):
return True
return False
class CoProcessor(object):
"""Base class for co-processing Pipelines. paraview.cpstate Module can
be used to dump out ParaView states as co-processing pipelines. Those are
typically subclasses of this. The subclasses must provide an
implementation for the CreatePipeline() method."""
def __init__(self):
self.__PipelineCreated = False
self.__ProducersMap = {}
self.__WritersList = []
self.__ViewsList = []
self.__EnableLiveVisualization = False
self.__LiveVisualizationLink = None
pass
def SetUpdateFrequencies(self, frequencies):
"""Set the frequencies at which the pipeline needs to be updated.
Typically, this is called by the subclass once it has determined what
timesteps co-processing will be needed to be done.
frequencies is a map, with key->string name of for the simulation
input, and value is a list of frequencies.
"""
if type(frequencies) != dict:
raise RuntimeError,\
"Incorrect argument type: %s, must be a dict" % type(frequencies)
self.__Frequencies = frequencies
def EnableLiveVisualization(self, enable):
"""Call this method to enable live-visualization. When enabled,
DoLiveVisualization() will communicate with ParaView server if possible
for live visualization"""
self.__EnableLiveVisualization = enable
def CreatePipeline(self, datadescription):
"""This methods must be overridden by subclasses to create the
visualization pipeline."""
raise RuntimeError, "Subclasses must override this method."
def LoadRequestedData(self, datadescription):
"""Call this method in RequestDataDescription co-processing pass to mark
the datadescription with information about what fields and grids are
required for this pipeline for the given timestep, if any.
Default implementation uses the update-frequencies set using
SetUpdateFrequencies() to determine if the current timestep needs to
be processed and then requests all fields. Subclasses can override
this method to provide addtional customizations."""
timestep = datadescription.GetTimeStep()
num_inputs = datadescription.GetNumberOfInputDescriptions()
# first set all inputs to be off and set to on as needed.
# we don't use vtkCPInputDataDescription::Reset() because
# that will reset any field names that were added in.
for cc in range(num_inputs):
datadescription.GetInputDescription(cc).AllFieldsOff()
datadescription.GetInputDescription(cc).GenerateMeshOff()
for cc in range(num_inputs):
input_name = datadescription.GetInputDescriptionName(cc)
freqs = self.__Frequencies.get(input_name, [])
if freqs:
if IsInModulo(timestep, freqs) :
datadescription.GetInputDescription(cc).AllFieldsOn()
datadescription.GetInputDescription(cc).GenerateMeshOn()
def UpdateProducers(self, datadescription):
"""This method will update the producers in the pipeline. If the
pipeline is not created, it will be created using
self.CreatePipeline().
"""
if not self.__PipelineCreated:
self.CreatePipeline(datadescription)
self.__PipelineCreated = True
simtime = datadescription.GetTime()
for name, producer in self.__ProducersMap.iteritems():
producer.GetClientSideObject().SetOutput(\
datadescription.GetInputDescriptionByName(name).GetGrid(),
simtime)
def WriteData(self, datadescription):
"""This method will update all writes present in the pipeline, as
needed, to generate the output data files, respecting the
write-frequencies set on the writers."""
timestep = datadescription.GetTimeStep()
for writer in self.__WritersList:
if (timestep % writer.cpFrequency) == 0 or \
datadescription.GetForceOutput() == True:
writer.FileName = writer.cpFileName.replace("%t", str(timestep))
writer.UpdatePipeline(datadescription.GetTime())
def WriteImages(self, datadescription, rescale_lookuptable=False):
"""This method will update all views, if present and write output
images, as needed."""
timestep = datadescription.GetTimeStep()
for view in self.__ViewsList:
if timestep % view.cpFrequency == 0 or \
datadescription.GetForceOutput() == True:
fname = view.cpFileName
fname = fname.replace("%t", str(timestep))
if view.cpFitToScreen != 0:
if view.IsA("vtkSMRenderViewProxy") == True:
view.ResetCamera()
elif view.IsA("vtkSMContextViewProxy") == True:
view.ResetDisplay()
else:
print ' do not know what to do with a ', view.GetClassName()
view.ViewTime = datadescription.GetTime()
if rescale_lookuptable:
self.RescaleDataRange(view, datadescription.GetTime())
simple.WriteImage(fname, view, Magnification=view.cpMagnification)
def DoLiveVisualization(self, datadescription, hostname, port):
"""This method execute the code-stub needed to communicate with ParaView
for live-visualization. Call this method only if you want to support
live-visualization with your co-processing module."""
if not self.__EnableLiveVisualization:
return
# make sure the live insitu is initialized
if not self.__LiveVisualizationLink:
# Create the vtkLiveInsituLink i.e. the "link" to the visualization processes.
from paraview import servermanager
self.__LiveVisualizationLink = servermanager.vtkLiveInsituLink()
# Tell vtkLiveInsituLink what host/port must it connect to for the visualization
# process.
self.__LiveVisualizationLink.SetHostname(hostname)
self.__LiveVisualizationLink.SetInsituPort(int(port))
# Initialize the "link"
self.__LiveVisualizationLink.SimulationInitialize(servermanager.ActiveConnection.Session.GetSessionProxyManager())
time = datadescription.GetTime()
# For every new timestep, update the simulation state before proceeding.
self.__LiveVisualizationLink.SimulationUpdate(time)
# sources need to be updated by insitu code. vtkLiveInsituLink never updates
# the pipeline, it simply uses the data available at the end of the pipeline,
# if any.
from paraview import simple
for source in simple.GetSources().values():
source.UpdatePipeline(time)
# push extracts to the visualization process.
self.__LiveVisualizationLink.SimulationPostProcess(time)
def CreateProducer(self, datadescription, inputname):
"""Creates a producer proxy for the grid. This method is generally used in
CreatePipeline() call to create producers."""
# Check that the producer name for the input given is valid for the
# current setup.
if not datadescription.GetInputDescriptionByName(inputname):
raise RuntimeError, "Simulation input name '%s' does not exist" % inputname
grid = datadescription.GetInputDescriptionByName(inputname).GetGrid()
producer = simple.PVTrivialProducer()
# we purposefully don't set the time for the PVTrivialProducer here.
# when we update the pipeline we will do it then.
producer.GetClientSideObject().SetOutput(grid)
if grid.IsA("vtkImageData") == True or \
grid.IsA("vtkStructuredGrid") == True or \
grid.IsA("vtkRectilinearGrid") == True:
extent = datadescription.GetInputDescriptionByName(inputname).GetWholeExtent()
producer.WholeExtent= [ extent[0], extent[1], extent[2], extent[3], extent[4], extent[5] ]
# Save the producer for easy access in UpdateProducers() call.
self.__ProducersMap[inputname] = producer
producer.UpdatePipeline()
return producer
def CreateWriter(self, proxy_ctor, filename, freq):
"""Creates a writer proxy. This method is generally used in
CreatePipeline() to create writers. All writes created as such will
write the output files appropriately in WriteData() is called."""
writer = proxy_ctor()
writer.FileName = filename
writer.add_attribute("cpFrequency", freq)
writer.add_attribute("cpFileName", filename)
self.__WritersList.append(writer)
return writer
def CreateView(self, proxy_ctor, filename, freq, fittoscreen, magnification, width, height):
"""Create a CoProcessing view for image capture with extra meta-data
such as magnification, size and frequency."""
view = proxy_ctor()
view.add_attribute("cpFileName", filename)
view.add_attribute("cpFrequency", freq)
view.add_attribute("cpFileName", filename)
view.add_attribute("cpFitToScreen", fittoscreen)
view.add_attribute("cpMagnification", magnification)
view.ViewSize = [ width, height ]
self.__ViewsList.append(view)
return view
def RescaleDataRange(self, view, time):
"""DataRange can change across time, sometime we want to rescale the
color map to match to the closer actual data range."""
reps = view.Representations
for rep in reps:
if not hasattr(rep, 'Visibility') or \
not rep.Visibility or \
not hasattr(rep, 'MapScalars') or \
not rep.MapScalars or \
not rep.LookupTable:
# rep is either not visibile or not mapping scalars using a LUT.
continue;
input = rep.Input
input.UpdatePipeline(time) #make sure range is up-to-date
lut = rep.LookupTable
if rep.ColorAttributeType == 'POINT_DATA':
datainformation = input.GetPointDataInformation()
elif rep.ColorAttributeType == 'CELL_DATA':
datainformation = input.GetCellDataInformation()
else:
print 'something strange with color attribute type', rep.ColorAttributeType
if datainformation.GetArray(rep.ColorArrayName) == None:
# there is no array on this process. it's possible
# that this process has no points or cells
continue
if lut.VectorMode != 'Magnitude' or \
datainformation.GetArray(rep.ColorArrayName).GetNumberOfComponents() == 1:
datarange = datainformation.GetArray(rep.ColorArrayName).GetRange(lut.VectorComponent)
else:
datarange = [0,0]
for i in range(datainformation.GetArray(rep.ColorArrayName).GetNumberOfComponents()):
for j in range(2):
datarange[j] += datainformation.GetArray(rep.ColorArrayName).GetRange(i)[j]*datainformation.GetArray(rep.ColorArrayName).GetRange(i)[j]
datarange[0] = math.sqrt(datarange[0])
datarange[1] = math.sqrt(datarange[1])
import vtkParallelCorePython
import paraview.vtk as vtk
import paraview.servermanager
pm = paraview.servermanager.vtkProcessModule.GetProcessModule()
globalController = pm.GetGlobalController()
localarray = vtk.vtkDoubleArray()
localarray.SetNumberOfTuples(2)
localarray.SetValue(0, -datarange[0]) # negate so that MPI_MAX gets min instead of doing a MPI_MIN and MPI_MAX
localarray.SetValue(1, datarange[1])
globalarray = vtk.vtkDoubleArray()
globalarray.SetNumberOfTuples(2)
globalController.AllReduce(localarray, globalarray, 0)
globaldatarange = [-globalarray.GetValue(0), globalarray.GetValue(1)]
rgbpoints = lut.RGBPoints.GetData()
numpts = len(rgbpoints)/4
if globaldatarange[0] != rgbpoints[0] or globaldatarange[1] != rgbpoints[(numpts-1)*4]:
# rescale all of the points
oldrange = rgbpoints[(numpts-1)*4] - rgbpoints[0]
newrange = globaldatarange[1] - globaldatarange[0]
# only readjust if the new range isn't zero.
if newrange != 0:
newrgbpoints = list(rgbpoints)
# if the old range isn't 0 then we use that ranges distribution
if oldrange != 0:
for v in range(numpts-1):
newrgbpoints[v*4] = globaldatarange[0]+(rgbpoints[v*4] - rgbpoints[0])*newrange/oldrange
# avoid numerical round-off, at least with the last point
newrgbpoints[(numpts-1)*4] = rgbpoints[(numpts-1)*4]
else: # the old range is 0 so the best we can do is to space the new points evenly
for v in range(numpts+1):
newrgbpoints[v*4] = globaldatarange[0]+v*newrange/(1.0*numpts)
lut.RGBPoints.SetData(newrgbpoints)
|