/usr/bin/gffToVcf is in pbgenomicconsensus 2.1.0-1.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 | #!/usr/bin/python
"""Utility for converting variant GFF3 files to 1000 Genomes VCF"""
import sys
import os
import time
import json
import logging
import argparse
import traceback
from pbcommand.models import FileTypes, get_pbparser
from pbcommand.cli import pbparser_runner
from pbcommand.utils import setup_log
from pbcore.io import GffReader, WriterBase
#
# (Ported from pbpy)
#
__version__ = "3.0"
log = logging.getLogger(__name__)
class Constants(object):
TASK_ID = "genomic_consensus.tasks.gff2vcf"
DRIVER_EXE = "gffToVcf --resolved-tool-contract "
GLOBAL_REFERENCE_ID = "genomic_consensus.task_options.global_reference"
class VcfRecord:
"""Models a record in a VCF3.3 file."""
def __init__(self):
self.chrom = ''
self.pos = 1
self.id = '.'
self.ref = ''
self.alt = ''
self.qual = -1.00
self.filter = '0'
self.info = {}
@staticmethod
def fromVariantGffRecord(gff):
vcf = VcfRecord()
vcf.chrom = gff.seqid
vcf.id = '.'
ref = gff.reference
if ref is None:
vcf.ref = "N"
else:
vcf.ref = ref
vcf.qual = float(gff.confidence)
vcf.put('NS', 1)
vcf.put('DP', gff.coverage)
feature = gff.type
vcf.pos = gff.start
if feature == 'insertion':
vcf.alt = 'I%s' % gff.variantSeq.upper()
elif feature == 'deletion':
vcf.alt = 'D%s' % len(gff.reference)
elif feature == 'substitution':
vcf.alt = gff.variantSeq.upper()
else:
print >> sys.stderr, 'Unsupported feature %s found in GFF3 file.' % feature
return vcf
def put(self, key, value):
self.info[key] = value
@staticmethod
def getHeader():
return 'CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO'
def _getInfoString(self):
return ';'.join(['%s=%s' % (k,v) \
for k,v in self.info.iteritems()])
def __str__(self):
return '%s\t%d\t%s\t%s\t%s\t%.2f\t%s\t%s' % \
(self.chrom, self.pos, self.id, self.ref, \
self.alt, self.qual, self.filter, self._getInfoString())
class VcfWriter(WriterBase):
"""Outputs VCF (1000 Genomes Variant Call Format) 3.3 files"""
def __init__(self, outfile):
self._outfile = outfile
self._start()
def close(self):
self._outfile.close()
def flush(self):
self._outfile.flush()
def _start(self):
self.writeMetaData('fileformat', 'VCFv3.3')
def writeHeader(self):
print >> self._outfile, '#%s' % VcfRecord.getHeader()
def writeMetaData(self, key, value):
print >> self._outfile, '##%s=%s' % (key, value)
def writeRecord( self, record ):
print >> self._outfile, str(record)
class GffToVcf(object):
"""Utility for converting variant GFF3 files to 1000 Genomes VCF"""
def __init__(self, gffFile, globalReference=None):
self.gffFile = gffFile
self.globalReference = globalReference
def _writeMetaData(self, writer):
currentTime = time.localtime()
cmdLine = os.path.basename(sys.argv[0]) + ' ' + ' '.join(sys.argv[1:])
writer.writeMetaData('fileDate', '%d%d%d' % \
(currentTime[0], currentTime[1], currentTime[2]))
writer.writeMetaData('source', cmdLine)
if self.globalReference is not None:
writer.writeMetaData('reference', self.globalReference)
writer.writeMetaData('INFO', 'NS,1,Integer,"Number of Samples with Data"')
writer.writeMetaData('INFO', 'DP,1,Integer,"Total Depth of Coverage"')
writer.writeHeader()
def run(self, out=sys.stdout):
with GffReader(self.gffFile) as reader, \
VcfWriter(out) as writer:
self._writeMetaData(writer)
for gff in reader:
vcf = VcfRecord.fromVariantGffRecord(gff)
writer.writeRecord(vcf)
return 0
def args_runner(args, out=sys.stdout):
return GffToVcf(
gffFile=args.gffFile,
globalReference=args.globalReference).run(out=out)
def resolved_tool_contract_runner(resolved_tool_contract):
rtc = resolved_tool_contract
with open(rtc.task.output_files[0], "w") as f:
gr = None #rtc.task.options[Constants.GLOBAL_REFERENCE_ID]
return GffToVcf(
gffFile=rtc.task.input_files[0],
globalReference=gr).run(out=f)
def get_contract_parser():
p = get_pbparser(
tool_id=Constants.TASK_ID,
version=__version__,
name="gffToVcf",
description=__doc__,
driver_exe=Constants.DRIVER_EXE,
default_level="ERROR")
ap = p.arg_parser.parser
tcp = p.tool_contract_parser
p.add_input_file_type(FileTypes.GFF, "gffFile",
"GFF file", "GFF file")
tcp.add_output_file_type(FileTypes.VCF, "vcf", "VCF file", "VCF file",
default_name="output")
ap.add_argument("--globalReference", action="store", default=None,
help="Name of global reference to put in Meta field")
return p
def main(argv=sys.argv):
return pbparser_runner(
argv=argv[1:],
parser=get_contract_parser(),
args_runner_func=args_runner,
contract_runner_func=resolved_tool_contract_runner,
alog=log,
setup_log_func=setup_log)
if __name__ == '__main__':
sys.exit(main())
|