This file is indexed.

/usr/lib/python3/dist-packages/sasmodels/kernelpy.py is in python3-sasmodels 0.97~git20171104-2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
"""
Python driver for python kernels

Calls the kernel with a vector of $q$ values for a single parameter set.
Polydispersity is supported by looping over different parameter sets and
summing the results.  The interface to :class:`PyModel` matches those for
:class:`kernelcl.GpuModel` and :class:`kerneldll.DllModel`.
"""
from __future__ import division, print_function

import logging

import numpy as np  # type: ignore
from numpy import pi, sin, cos  #type: ignore

from . import details
from .generate import F64
from .kernel import KernelModel, Kernel

try:
    from typing import Union, Callable
except ImportError:
    pass
else:
    DType = Union[None, str, np.dtype]

class PyModel(KernelModel):
    """
    Wrapper for pure python models.
    """
    def __init__(self, model_info):
        # Make sure Iq and Iqxy are available and vectorized
        _create_default_functions(model_info)
        self.info = model_info
        self.dtype = np.dtype('d')
        logging.info("load python model " + self.info.name)

    def make_kernel(self, q_vectors):
        q_input = PyInput(q_vectors, dtype=F64)
        kernel = self.info.Iqxy if q_input.is_2d else self.info.Iq
        return PyKernel(kernel, self.info, q_input)

    def release(self):
        """
        Free resources associated with the model.
        """
        pass

class PyInput(object):
    """
    Make q data available to the gpu.

    *q_vectors* is a list of q vectors, which will be *[q]* for 1-D data,
    and *[qx, qy]* for 2-D data.  Internally, the vectors will be reallocated
    to get the best performance on OpenCL, which may involve shifting and
    stretching the array to better match the memory architecture.  Additional
    points will be evaluated with *q=1e-3*.

    *dtype* is the data type for the q vectors. The data type should be
    set to match that of the kernel, which is an attribute of
    :class:`GpuProgram`.  Note that not all kernels support double
    precision, so even if the program was created for double precision,
    the *GpuProgram.dtype* may be single precision.

    Call :meth:`release` when complete.  Even if not called directly, the
    buffer will be released when the data object is freed.
    """
    def __init__(self, q_vectors, dtype):
        self.nq = q_vectors[0].size
        self.dtype = dtype
        self.is_2d = (len(q_vectors) == 2)
        if self.is_2d:
            self.q = np.empty((self.nq, 2), dtype=dtype)
            self.q[:, 0] = q_vectors[0]
            self.q[:, 1] = q_vectors[1]
        else:
            self.q = np.empty(self.nq, dtype=dtype)
            self.q[:self.nq] = q_vectors[0]

    def release(self):
        """
        Free resources associated with the model inputs.
        """
        self.q = None

class PyKernel(Kernel):
    """
    Callable SAS kernel.

    *kernel* is the DllKernel object to call.

    *model_info* is the module information

    *q_input* is the DllInput q vectors at which the kernel should be
    evaluated.

    The resulting call method takes the *pars*, a list of values for
    the fixed parameters to the kernel, and *pd_pars*, a list of (value,weight)
    vectors for the polydisperse parameters.  *cutoff* determines the
    integration limits: any points with combined weight less than *cutoff*
    will not be calculated.

    Call :meth:`release` when done with the kernel instance.
    """
    def __init__(self, kernel, model_info, q_input):
        # type: (callable, ModelInfo, List[np.ndarray]) -> None
        self.dtype = np.dtype('d')
        self.info = model_info
        self.q_input = q_input
        self.res = np.empty(q_input.nq, q_input.dtype)
        self.kernel = kernel
        self.dim = '2d' if q_input.is_2d else '1d'

        partable = model_info.parameters
        kernel_parameters = (partable.iqxy_parameters if q_input.is_2d
                             else partable.iq_parameters)
        volume_parameters = partable.form_volume_parameters

        # Create an array to hold the parameter values.  There will be a
        # single array whose values are updated as the calculator goes
        # through the loop.  Arguments to the kernel and volume functions
        # will use views into this vector, relying on the fact that a
        # an array of no dimensions acts like a scalar.
        parameter_vector = np.empty(len(partable.call_parameters)-2, 'd')

        # Create views into the array to hold the arguments
        offset = 0
        kernel_args, volume_args = [], []
        for p in partable.kernel_parameters:
            if p.length == 1:
                # Scalar values are length 1 vectors with no dimensions.
                v = parameter_vector[offset:offset+1].reshape(())
            else:
                # Vector values are simple views.
                v = parameter_vector[offset:offset+p.length]
            offset += p.length
            if p in kernel_parameters:
                kernel_args.append(v)
            if p in volume_parameters:
                volume_args.append(v)

        # Hold on to the parameter vector so we can use it to call kernel later.
        # This may also be required to preserve the views into the vector.
        self._parameter_vector = parameter_vector

        # Generate a closure which calls the kernel with the views into the
        # parameter array.
        if q_input.is_2d:
            form = model_info.Iqxy
            qx, qy = q_input.q[:, 0], q_input.q[:, 1]
            self._form = lambda: form(qx, qy, *kernel_args)
        else:
            form = model_info.Iq
            q = q_input.q
            self._form = lambda: form(q, *kernel_args)

        # Generate a closure which calls the form_volume if it exists.
        form_volume = model_info.form_volume
        self._volume = ((lambda: form_volume(*volume_args)) if form_volume
                        else (lambda: 1.0))

    def __call__(self, call_details, values, cutoff, magnetic):
        # type: (CallDetails, np.ndarray, np.ndarray, float, bool) -> np.ndarray
        if magnetic:
            raise NotImplementedError("Magnetism not implemented for pure python models")
        #print("Calling python kernel")
        #call_details.show(values)
        res = _loops(self._parameter_vector, self._form, self._volume,
                     self.q_input.nq, call_details, values, cutoff)
        return res

    def release(self):
        # type: () -> None
        """
        Free resources associated with the kernel.
        """
        self.q_input.release()
        self.q_input = None

def _loops(parameters, form, form_volume, nq, call_details, values, cutoff):
    # type: (np.ndarray, Callable[[], np.ndarray], Callable[[], float], int, details.CallDetails, np.ndarray, np.ndarray, float) -> None
    ################################################################
    #                                                              #
    #   !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!   #
    #   !!                                                    !!   #
    #   !!  KEEP THIS CODE CONSISTENT WITH KERNEL_TEMPLATE.C  !!   #
    #   !!                                                    !!   #
    #   !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!   #
    #                                                              #
    ################################################################
    n_pars = len(parameters)
    parameters[:] = values[2:n_pars+2]
    if call_details.num_active == 0:
        pd_norm = float(form_volume())
        scale = values[0]/(pd_norm if pd_norm != 0.0 else 1.0)
        background = values[1]
        return scale*form() + background

    pd_value = values[2+n_pars:2+n_pars + call_details.num_weights]
    pd_weight = values[2+n_pars + call_details.num_weights:]

    pd_norm = 0.0
    spherical_correction = 1.0
    partial_weight = np.NaN
    weight = np.NaN

    p0_par = call_details.pd_par[0]
    p0_is_theta = (p0_par == call_details.theta_par)
    p0_length = call_details.pd_length[0]
    p0_index = p0_length
    p0_offset = call_details.pd_offset[0]

    pd_par = call_details.pd_par[:call_details.num_active]
    pd_offset = call_details.pd_offset[:call_details.num_active]
    pd_stride = call_details.pd_stride[:call_details.num_active]
    pd_length = call_details.pd_length[:call_details.num_active]

    total = np.zeros(nq, 'd')
    for loop_index in range(call_details.num_eval):
        # update polydispersity parameter values
        if p0_index == p0_length:
            pd_index = (loop_index//pd_stride)%pd_length
            parameters[pd_par] = pd_value[pd_offset+pd_index]
            partial_weight = np.prod(pd_weight[pd_offset+pd_index][1:])
            if call_details.theta_par >= 0:
                cor = sin(pi / 180 * parameters[call_details.theta_par])
                spherical_correction = max(abs(cor), 1e-6)
            p0_index = loop_index%p0_length

        weight = partial_weight * pd_weight[p0_offset + p0_index]
        parameters[p0_par] = pd_value[p0_offset + p0_index]
        if p0_is_theta:
            cor = cos(pi/180 * parameters[p0_par])
            spherical_correction = max(abs(cor), 1e-6)
        p0_index += 1
        if weight > cutoff:
            # Call the scattering function
            # Assume that NaNs are only generated if the parameters are bad;
            # exclude all q for that NaN.  Even better would be to have an
            # INVALID expression like the C models, but that is too expensive.
            Iq = np.asarray(form(), 'd')
            if np.isnan(Iq).any():
                continue

            # update value and norm
            weight *= spherical_correction
            total += weight * Iq
            pd_norm += weight * form_volume()

    scale = values[0]/(pd_norm if pd_norm != 0.0 else 1.0)
    background = values[1]
    return scale*total + background


def _create_default_functions(model_info):
    """
    Autogenerate missing functions, such as Iqxy from Iq.

    This only works for Iqxy when Iq is written in python. :func:`make_source`
    performs a similar role for Iq written in C.  This also vectorizes
    any functions that are not already marked as vectorized.
    """
    _create_vector_Iq(model_info)
    _create_vector_Iqxy(model_info)  # call create_vector_Iq() first


def _create_vector_Iq(model_info):
    """
    Define Iq as a vector function if it exists.
    """
    Iq = model_info.Iq
    if callable(Iq) and not getattr(Iq, 'vectorized', False):
        #print("vectorizing Iq")
        def vector_Iq(q, *args):
            """
            Vectorized 1D kernel.
            """
            return np.array([Iq(qi, *args) for qi in q])
        vector_Iq.vectorized = True
        model_info.Iq = vector_Iq

def _create_vector_Iqxy(model_info):
    """
    Define Iqxy as a vector function if it exists, or default it from Iq().
    """
    Iq, Iqxy = model_info.Iq, model_info.Iqxy
    if callable(Iqxy):
        if not getattr(Iqxy, 'vectorized', False):
            #print("vectorizing Iqxy")
            def vector_Iqxy(qx, qy, *args):
                """
                Vectorized 2D kernel.
                """
                return np.array([Iqxy(qxi, qyi, *args) for qxi, qyi in zip(qx, qy)])
            vector_Iqxy.vectorized = True
            model_info.Iqxy = vector_Iqxy
    elif callable(Iq):
        #print("defaulting Iqxy")
        # Iq is vectorized because create_vector_Iq was already called.
        def default_Iqxy(qx, qy, *args):
            """
            Default 2D kernel.
            """
            return Iq(np.sqrt(qx**2 + qy**2), *args)
        default_Iqxy.vectorized = True
        model_info.Iqxy = default_Iqxy