This file is indexed.

/usr/lib/python2.7/dist-packages/pandas/io/gbq.py is in python-pandas 0.13.1-2ubuntu2.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
"""
Pandas module to interface with Google BigQuery.
"""
import os
import sys
import tempfile
import csv
import logging
from datetime import datetime
import pkg_resources
from distutils.version import LooseVersion

import pandas as pd
import numpy as np

from pandas.core.common import PandasError
from pandas.core.frame import DataFrame
from pandas.tools.merge import concat

try:
    import bq
    import bigquery_client
    import gflags as flags
    _BQ_INSTALLED = True

    _BQ_VERSION = pkg_resources.get_distribution('bigquery').version
    if LooseVersion(_BQ_VERSION) >= '2.0.17':
        _BQ_VALID_VERSION = True
    else:
        _BQ_VALID_VERSION = False

except ImportError:
    _BQ_INSTALLED = False


# Setup the logger
logger = logging.getLogger('pandas.io.gbq')

# These are some custom exceptions that the
# to_gbq() method can throw


class SchemaMissing(PandasError, IOError):
    """
    Raised when attempting to write a DataFrame to
    a new table in Google BigQuery without specifying
    a schema describing the DataFrame.
    """
    pass


class InvalidSchema(PandasError, IOError):
    """
    Raised when attempting to write a DataFrame to
    Google BigQuery with an invalid table schema.
    """
    pass


class TableExistsFail(PandasError, IOError):
    """
    Raised when attempting to write a DataFrame to
    an existing Google BigQuery table without specifying
    that a replace/update action be taken.
    """
    pass


class InvalidColumnOrder(PandasError, IOError):
    """
    Raised when the provided column order for output
    results DataFrame does not match the schema
    returned by BigQuery.
    """
    pass


def _authenticate():
    """
    For testing, we abstract the authentication to BigQuery API.
    Presently this is implemented using the bq.py Client.Get()
    method. Any exceptions raised are considered fatal, so we
    do not process them.

    Returns
    -------
    BigqueryClient : Configured connection to Google BigQuery
    """
    return bq.Client.Get()


def _parse_entry(field_value, field_type):
    """
    Given a value and the corresponding BigQuery data type,
    perform any operations needed and return in a format
    appropriate for a numpy record dictionary

    Parameters
    ----------
    field_value : Source object to be transformed
    field_type : String representation of Google BigQuery
                 data type (per schema)

    Returns
    -------
    field_value : object or primitive of type corresponding
                  to field_type
    """

    # Avoid any casting problems
    if field_value is None or field_value == 'null':
        return None
    if field_type == 'INTEGER' or field_type == 'FLOAT':
        field_value = float(field_value)
    elif field_type == 'TIMESTAMP':
        timestamp = datetime.utcfromtimestamp(float(field_value))
        field_value = np.datetime64(timestamp)
    elif field_type == 'BOOLEAN':
        field_value = field_value == 'true'
    # Note that results are unicode, so this will
    # fail for non-ASCII characters.. this probably
    # functions differently in Python 3
    else:
        field_value = str(field_value)
    return field_value


def _parse_page(raw_page, col_names, col_types, col_dtypes):
    """
    Given a list of rows produced by the client.apiclient.tabledata().list(),
    build a numpy array with proper dtypes and column names as specified
    by the arguments.

    Parameters
    ----------
    raw_page : Resulting list of rows from a page retrieved via
        bigquery API
        client.apiclient.tabledata().list().execute()['rows']
    col_names: An ordered list of names for the columns
    col_types: String representation of the BigQuery DataType for that
        column
    col_dtypes: Target numpy.dtype for the column

    Returns
    -------
    page_array : numpy record array corresponding
        to the page data
    """

    # Should be at most 100,000 per the API, but this could
    # be increased in the future. Should only be less than
    # this for the last page to reduce API calls
    page_row_count = len(raw_page)

    # Place to hold the results for a page of data
    page_array = np.zeros((page_row_count,), dtype=zip(col_names, col_dtypes))
    for row_num, raw_row in enumerate(raw_page):
        entries = raw_row.get('f', [])
         # Iterate over each entry - setting proper field types
        for col_num, field_type in enumerate(col_types):
            # Process the field's types using schema
            field_value = _parse_entry(entries[col_num].get('v', ''),
                                       field_type)
            # Fill the value into the final array
            page_array[row_num][col_num] = field_value

    return page_array


def _parse_data(client, job, index_col=None, col_order=None):
    """
    Iterate through the query results and piece together the
    final DataFrame. Builds a DataFrame for each page of
    results, then concatenates them together when finished.
    To save memory, we use numpy record arrays to build these
    DataFrames.

    Parameters
    ----------
    client: An instance of bq.Client
    job: An array containing the job info for a completed query
    index_col: str (optional)
        Name of result column to use for index in results DataFrame
    col_order: list() (optional)
        List of BigQuery column names in the desired order for results
        DataFrame

    Returns
    -------
    df: pandas DataFrame
        DataFrame representing results of query

    Raises:
    ------
    InvalidColumnOrder:
        Raised if 'col_order' parameter doesn't match returned DataFrame
    BigqueryError:
        Raised by bigquery_client if a Google API error is encountered


    Notes:
    -----
    This script relies on Google being consistent with their
    pagination API. We are using the most flexible iteration method
    that we could find in the bq.py/bigquery_client.py API's, but
    these have undergone large amounts of change recently.
    """

    # dtype Map -
    # see: http://pandas.pydata.org/pandas-docs/dev/missing_data.html#missing-data-casting-rules-and-indexing
    dtype_map = {'INTEGER': np.dtype(float),
                 'FLOAT': np.dtype(float),
                 'TIMESTAMP': 'M8[ns]'}     # This seems to be buggy without
                                            # nanosecond indicator

    # We first need the schema to get information about the columns of
    # our dataframe.

    table_dict = job['configuration']['query']['destinationTable']
    fields = client.GetTableSchema(table_dict)['fields']

    # Get the schema into a format useable to create our
    # dataframe
    col_dtypes = []
    col_types = []
    col_names = []

    # TODO: Do this in one clean step
    for field in fields:
        col_types.append(field['type'])
        # Note the encoding... numpy doesn't like titles that are UTF8, which
        # is the return type from the API
        col_names.append(field['name'].encode('ascii', 'ignore'))
        # Note, it would be nice to use 'str' types, but BigQuery doesn't have
        # a fixed length in mind - just maxes out at 64k
        col_dtypes.append(dtype_map.get(field['type'], object))

    # How many columns are there
    num_columns = len(col_names)

    # Iterate over the result rows.
    # Since Google's API now requires pagination of results,
    # we do that here. The following is repurposed from
    # bigquery_client.py  :: Client._JobTableReader._ReadOnePage

    # TODO: Enable Reading From Table,
    # see Client._TableTableReader._ReadOnePage

    # Initially, no page token is set
    page_token = None

    # This number is the current max results per page
    max_rows = bigquery_client._MAX_ROWS_PER_REQUEST

    # How many rows in result set? Initialize to max_rows
    total_rows = max_rows

    # This is the starting row for a particular page...
    # is ignored if page_token is present, though
    # it may be useful if we wish to implement SQL like LIMITs
    # with minimums
    start_row = 0

    # Keep our page DataFrames until the end when we concatenate them
    dataframe_list = list()

    current_job = job['jobReference']

    # Iterate over all rows
    while start_row < total_rows:
        # Setup the parameters for getQueryResults() API Call
        kwds = dict(current_job)
        kwds['maxResults'] = max_rows
        # Sets the timeout to 0 because we assume the table is already ready.
        # This is because our previous call to Query() is synchronous
        # and will block until it's actually done
        kwds['timeoutMs'] = 0
        # Use start row if there's no page_token ... in other words, the
        # user requested to start somewhere other than the beginning...
        # presently this is not a parameter to read_gbq(), but it will be
        # added eventually.
        if page_token:
            kwds['pageToken'] = page_token
        else:
            kwds['startIndex'] = start_row
        data = client.apiclient.jobs().getQueryResults(**kwds).execute()
        if not data['jobComplete']:
            raise bigquery_client.BigqueryError('Job was not completed, or was invalid')

        # How many rows are there across all pages?
        # Note: This is presently the only reason we don't just use
        # _ReadOnePage() directly
        total_rows = int(data['totalRows'])

        page_token = data.get('pageToken', None)
        raw_page = data.get('rows', [])
        page_array = _parse_page(raw_page, col_names, col_types, col_dtypes)

        start_row += len(raw_page)
        if total_rows > 0:
            completed = (100 * start_row) / total_rows
            logger.info('Remaining Rows: ' + str(total_rows - start_row) + '('
                        + str(completed) + '% Complete)')
        else:
            logger.info('No Rows')

        dataframe_list.append(DataFrame(page_array))

        # Did we get enough rows? Note: gbq.py stopped checking for this
        # but we felt it was still a good idea.
        if not page_token and not raw_page and start_row != total_rows:
            raise bigquery_client.BigqueryInterfaceError(
                'Not enough rows returned by server. Expected: {0} Rows, But '
                'Received {1}'.format(total_rows, start_row)
            )

    # Build final dataframe
    final_df = concat(dataframe_list, ignore_index=True)

    # Reindex the DataFrame on the provided column
    if index_col is not None:
        if index_col in col_names:
            final_df.set_index(index_col, inplace=True)
            col_names.remove(index_col)
        else:
            raise InvalidColumnOrder(
                'Index column "{0}" does not exist in DataFrame.'
                .format(index_col)
            )

    # Change the order of columns in the DataFrame based on provided list
    if col_order is not None:
        if sorted(col_order) == sorted(col_names):
            final_df = final_df[col_order]
        else:
            raise InvalidColumnOrder(
                'Column order does not match this DataFrame.'
            )

    # Downcast floats to integers and objects to booleans
    # if there are no NaN's. This is presently due to a
    # limitation of numpy in handling missing data.
    final_df._data = final_df._data.downcast(dtypes='infer')
    return final_df


def to_gbq(dataframe, destination_table, schema=None, col_order=None,
           if_exists='fail', **kwargs):
    """Write a DataFrame to a Google BigQuery table.

    THIS IS AN EXPERIMENTAL LIBRARY

    If the table exists, the DataFrame will be appended. If not, a new table
    will be created, in which case the schema will have to be specified. By
    default, rows will be written in the order they appear in the DataFrame,
    though the user may specify an alternative order.

    Parameters
    ----------
    dataframe : DataFrame
        DataFrame to be written
    destination_table : string
         name of table to be written, in the form 'dataset.tablename'
    schema : sequence (optional)
         list of column types in order for data to be inserted,
         e.g. ['INTEGER', 'TIMESTAMP', 'BOOLEAN']
    col_order : sequence (optional)
         order which columns are to be inserted,
         e.g. ['primary_key', 'birthday', 'username']
    if_exists : {'fail', 'replace', 'append'} (optional)
        - fail: If table exists, do nothing.
        - replace: If table exists, drop it, recreate it, and insert data.
        - append: If table exists, insert data. Create if does not exist.
    kwargs are passed to the Client constructor

    Raises
    ------
    SchemaMissing :
        Raised if the 'if_exists' parameter is set to 'replace', but no schema
        is specified
    TableExists :
        Raised if the specified 'destination_table' exists but the 'if_exists'
        parameter is set to 'fail' (the default)
    InvalidSchema :
        Raised if the 'schema' parameter does not match the provided DataFrame
    """

    if not _BQ_INSTALLED:
        if sys.version_info >= (3, 0):
            raise NotImplementedError('gbq module does not support Python 3 '
                                      'yet')
        else:
            raise ImportError('Could not import Google BigQuery Client.')

    if not _BQ_VALID_VERSION:
        raise ImportError("pandas requires bigquery >= 2.0.17 for Google "
                          "BigQuery support, current version " + _BQ_VERSION)

    ALLOWED_TYPES = ['STRING', 'INTEGER', 'FLOAT', 'BOOLEAN', 'TIMESTAMP',
                     'RECORD']

    if if_exists == 'replace' and schema is None:
        raise SchemaMissing('Cannot replace a table without specifying the '
                            'data schema')
    else:
        client = _authenticate()
        table_reference = client.GetTableReference(destination_table)
        if client.TableExists(table_reference):
            if if_exists == 'fail':
                raise TableExistsFail('Cannot overwrite existing tables if '
                                      '\'if_exists="fail"\'')
            else:
                # Build up a string representation of the
                # table's schema. Since the table already
                # exists, we ask ask the API for it, which
                # is returned in a list of dictionaries
                # describing column data. Iterate over these
                # and build up a string of form:
                # "col_name1 : col_type1, col_name2 : col_type2..."
                schema_full = client.GetTableSchema(
                    dict(table_reference)
                )['fields']
                schema = ''
                for count, row in enumerate(schema_full):
                    if count > 0:
                        schema += ', '
                    schema += row['name'] + ':' + row['type']
        else:
            logger.info('Creating New Table')
            if schema is None:
                raise SchemaMissing('Cannot create a new table without '
                                    'specifying the data schema')
            else:
                columns = dataframe.columns
                if len(schema) != len(columns):
                    raise InvalidSchema('Incorrect number of columns in '
                                        'schema')
                else:
                    schema_string = ''
                    for count, name in enumerate(columns):
                        if count > 0:
                            schema_string += ', '
                        column_type = schema[count].upper()
                        if column_type in ALLOWED_TYPES:
                            schema_string += name + ':' + schema[count].lower()
                        else:
                            raise InvalidSchema('Invalid Type: ' + column_type
                                                + ". Must be one of: " +
                                                str(ALLOWED_TYPES))
                    schema = schema_string

    opts = kwargs
    opts['sync'] = True
    opts['skip_leading_rows'] = 1
    opts['encoding'] = 'UTF-8'
    opts['max_bad_records'] = 0

    # See: https://developers.google.com/bigquery/docs/reference/v2/jobs
    if if_exists == 'replace':
        opts['write_disposition'] = 'WRITE_TRUNCATE'
    elif if_exists == 'append':
        opts['write_disposition'] = 'WRITE_APPEND'

    with tempfile.NamedTemporaryFile() as csv_file:
        dataframe.to_csv(csv_file.name, index=False, encoding='utf-8')
        job = client.Load(table_reference, csv_file.name, schema=schema,
                          **opts)


def read_gbq(query, project_id=None, destination_table=None, index_col=None,
             col_order=None, **kwargs):
    """Load data from Google BigQuery.

    THIS IS AN EXPERIMENTAL LIBRARY

    The main method a user calls to load data from Google BigQuery into a
    pandas DataFrame. This is a simple wrapper for Google's bq.py and
    bigquery_client.py, which we use to get the source data. Because of this,
    this script respects the user's bq settings file, '~/.bigqueryrc', if it
    exists. Such a file can be generated using 'bq init'. Further, additional
    parameters for the query can be specified as either ``**kwds`` in the
    command, or using FLAGS provided in the 'gflags' module. Particular options
    can be found in bigquery_client.py.

    Parameters
    ----------
    query : str
        SQL-Like Query to return data values
    project_id : str (optional)
        Google BigQuery Account project ID. Optional, since it may be
        located in ~/.bigqueryrc
    index_col : str (optional)
        Name of result column to use for index in results DataFrame
    col_order : list(str) (optional)
        List of BigQuery column names in the desired order for results
        DataFrame
    destination_table : string (optional)
        If provided, send the results to the given table.
    **kwargs :
        To be passed to bq.Client.Create(). Particularly: 'trace',
        'sync', 'api', 'api_version'

    Returns
    -------
    df: DataFrame
        DataFrame representing results of query

    """
    if not _BQ_INSTALLED:
        if sys.version_info >= (3, 0):
            raise NotImplementedError('gbq module does not support Python 3 '
                                      'yet')
        else:
            raise ImportError('Could not import Google BigQuery Client.')

    if not _BQ_VALID_VERSION:
        raise ImportError('pandas requires bigquery >= 2.0.17 for Google '
                          'BigQuery support, current version ' + _BQ_VERSION)

    query_args = kwargs
    query_args['project_id'] = project_id
    query_args['query'] = query
    query_args['destination_table'] = destination_table
    query_args['sync'] = True

    client = _authenticate()

    job = client.Query(**query_args)

    return _parse_data(client, job, index_col=index_col, col_order=col_order)