/usr/lib/python3/dist-packages/bropkg/source.py is in bro-pkg 1.3.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | """
A module containing the definition of a "package source": a git repository
containing a collection of :file:`bro-pkg.index` files. These are simple INI
files that can describe many Bro packages. Each section of the file names
a Bro package along with the git URL where it is located and metadata tags that
help classify/describe it.
"""
import os
import shutil
import git
try:
from backports import configparser
except ImportError as err:
import configparser
from . import LOG
from .package import (
name_from_path,
Package
)
from ._util import git_clone_shallow
#: The name of package index files.
INDEX_FILENAME = 'bro-pkg.index'
#: The name of the package source file where package metadata gets aggregated.
AGGREGATE_DATA_FILE = 'aggregate.meta'
class Source(object):
"""A Bro package source.
This class contains properties of a package source like its name, remote git
URL, and local git clone.
Attributes:
name (str): The name of the source as given by a config file key
in it's ``[sources]`` section.
git_url (str): The git URL of the package source.
clone (git.Repo): The local git clone of the package source.
"""
def __init__(self, name, clone_path, git_url):
"""Create a package source.
Raises:
git.exc.GitCommandError: if the git repo is invalid
OSError: if the git repo is invalid and can't be re-initialized
"""
git_url = os.path.expanduser(git_url)
self.name = name
self.git_url = git_url
self.clone = None
try:
self.clone = git.Repo(clone_path)
except git.exc.NoSuchPathError:
LOG.debug('creating source clone of "%s" at %s', name, clone_path)
self.clone = git_clone_shallow(git_url, clone_path)
except git.exc.InvalidGitRepositoryError:
LOG.debug('deleting invalid source clone of "%s" at %s',
name, clone_path)
shutil.rmtree(clone_path)
self.clone = git_clone_shallow(git_url, clone_path)
else:
LOG.debug('found source clone of "%s" at %s', name, clone_path)
old_url = self.clone.git.config('--local', '--get',
'remote.origin.url')
if git_url != old_url:
LOG.debug(
'url of source "%s" changed from %s to %s, reclone at %s',
name, old_url, git_url, clone_path)
shutil.rmtree(clone_path)
self.clone = git_clone_shallow(git_url, clone_path)
def __str__(self):
return self.git_url
def __repr__(self):
return self.git_url
def package_index_files(self):
"""Return a list of paths to package index files in the source."""
rval = []
visited_dirs = set()
for root, dirs, files in os.walk(self.clone.working_dir,
followlinks=True):
stat = os.stat(root)
visited_dirs.add((stat.st_dev, stat.st_ino))
dirs_to_visit_next = []
for d in dirs:
stat = os.stat(os.path.join(root, d))
if (stat.st_dev, stat.st_ino) not in visited_dirs:
dirs_to_visit_next.append(d)
dirs[:] = dirs_to_visit_next
try:
dirs.remove('.git')
except ValueError:
pass
for filename in files:
if filename == INDEX_FILENAME:
rval.append(os.path.join(root, filename))
return sorted(rval)
def packages(self):
"""Return a list of :class:`.package.Package` in the source."""
rval = []
# Use raw parser so no value interpolation takes place.
parser = configparser.RawConfigParser()
aggregate_file = os.path.join(
self.clone.working_dir, AGGREGATE_DATA_FILE)
parser.read(aggregate_file)
for index_file in self.package_index_files():
relative_path = index_file[len(self.clone.working_dir) + 1:]
directory = os.path.dirname(relative_path)
lines = []
with open(index_file) as f:
lines = [line.rstrip('\n') for line in f]
for url in lines:
pkg_name = name_from_path(url)
agg_key = os.path.join(directory, pkg_name)
metadata = {}
if parser.has_section(agg_key):
metadata = {key: value for key,
value in parser.items(agg_key)}
package = Package(git_url=url, source=self.name,
directory=directory, metadata=metadata)
rval.append(package)
return rval
|