/usr/share/pyshared/impacket/dcerpc/atsvc.py is in python-impacket 0.9.10-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | # Copyright (c) 2003-2012 CORE Security Technologies
#
# This software is provided under under a slightly modified version
# of the Apache Software License. See the accompanying LICENSE file
# for more information.
#
# $Id: atsvc.py 555 2012-05-22 03:28:17Z bethus@gmail.com $
#
# Author: Alberto Solino
#
# Description:
# ATSVC implementation of some methods [MS-TSCH]
#
from struct import *
from impacket.structure import Structure
from impacket import dcerpc
from impacket.dcerpc import ndrutils, dcerpc
from impacket.uuid import uuidtup_to_bin
MSRPC_UUID_ATSVC = uuidtup_to_bin(('1FF70682-0A51-30E8-076D-740BE8CEE98B', '1.0'))
MSRPC_UUID_SASEC = uuidtup_to_bin(('378E52B0-C0A9-11CF-822D-00AA0051E40F', '1.0'))
MSRPC_UUID_TSS = uuidtup_to_bin(('86D35949-83C9-4044-B424-DB363231FD0C', '1.0'))
# Constants
S_OK = 0x00000000
S_FALSE = 0x00000001
E_OUTOFMEMORY = 0x80000002
E_ACCESSDENIED = 0x80000009
E_INVALIDARG = 0x80000003
E_FAIL = 0x80000008
E_UNEXPECTED = 0x8000FFFF
# Structures
class AT_INFO(Structure):
structure = (
('JobTime', '<L=0xff'),
('DaysOfMonth','<L=0'),
('DaysOfWeek','<B=0'),
('Flags','<B=0'),
('unknown','<H=0xffff'),
('Command',':',ndrutils.NDRUniqueStringW),
)
# Opnums
class ATSVCNetrJobAdd(Structure):
opnum = 0
#alignment = 4
structure = (
('ServerName',':',ndrutils.NDRUniqueStringW),
('pAtInfo',':',AT_INFO),
)
class ATSVCNetrJobEnum(Structure):
opnum = 2
alignment = 4
structure = (
('ServerName',':',ndrutils.NDRUniqueStringW),
('TotalEntries','<L=0'),
('pEnumContainer','<L=0'),
('PreferredMaximumLength','<L=0xffffffff'),
('refId','<L=1'),
('ResumeHandle','<L=0xff'),
)
class ATSVCNetrJobEnumResponse(Structure):
structure = (
('EntriedRead','<L'),
('RefId','<L'),
('Count','<L'),
('Size','_-Entries','len(self.rawData)-7*4'),
('Entries',':'),
('TotalEntries','<L'),
('RedId2','<L'),
('ResumeHandle','<L=0xff'),
)
class ATSVCSchRpcEnumTasks(Structure):
opnum = 7
alignment = 4
structure = (
('Path',':',ndrutils.NDRStringW),
('Flags','<L'),
('StartIndex','<L=0'),
('cRequested','<L=1'),
)
class ATSVCSchRpcEnumTasksResp(Structure):
structure = (
('pcNames','<L'),
('Count','<L'),
('RefId','<L'),
('Count2','<L'),
('TaskName',':',ndrutils.NDRUniqueStringW),
('ErrorCode','<L'),
)
class ATSVCSessionError(Exception):
# ToDo: Complete this stuff
error_messages = {
}
def __init__( self, error_code):
Exception.__init__(self)
self.error_code = error_code
def get_error_code( self ):
return self.error_code
def __str__( self ):
key = self.error_code
if (ATSVCSessionError.error_messages.has_key(key)):
error_msg_short = ATSVCSessionError.error_messages[key][0]
error_msg_verbose = ATSVCSessionError.error_messages[key][1]
return 'ATSVC SessionError: code: %s - %s - %s' % (str(self.error_code), error_msg_short, error_msg_verbose)
else:
return 'ATSVC SessionError: unknown error code: %s' % (str(self.error_code))
class DCERPCAtSvc:
def __init__(self, dcerpc):
self._dcerpc = dcerpc
def doRequest(self, request, noAnswer = 0, checkReturn = 1):
self._dcerpc.call(request.opnum, request)
if noAnswer:
return
else:
answer = self._dcerpc.recv()
if checkReturn and answer[-4:] != '\x00\x00\x00\x00':
error_code = unpack("<L", answer[-4:])[0]
raise ATSVCSessionError(error_code)
return answer
def NetrJobAdd(self, serverName, atInfo):
jobAdd = ATSVCNetrJobAdd()
jobAdd['ServerName'] = ndrutils.NDRUniqueStringW()
jobAdd['ServerName']['Data'] = (serverName+'\x00').encode('utf-16le')
jobAdd['pAtInfo'] = atInfo
ans = self.doRequest(jobAdd, checkReturn = 1)
return ans
def NetrJobEnum(self, serverName, resumeHandle = 0x0 ):
jobEnum = ATSVCNetrJobEnum()
jobEnum['ServerName'] = ndrutils.NDRUniqueStringW()
jobEnum['ServerName']['Data'] = (serverName+'\x00').encode('utf-16le')
jobEnum['ResumeHandle'] = resumeHandle
packet = self.doRequest(jobEnum, checkReturn = 1)
ans = ATSVCNetrJobEnumResponse(packet)
return ans
def SchRpcEnumTasks(self, path, startIndex=0, flags=0):
enumTasks = ATSVCSchRpcEnumTasks()
enumTasks['Path'] = ndrutils.NDRStringW()
enumTasks['Path']['Data'] = (path+'\x00').encode('utf-16le')
enumTasks['StartIndex'] = startIndex
enumTasks['Flags'] = 0
packet = self.doRequest(enumTasks, checkReturn = 0)
ans = ATSVCSchRpcEnumTasksResp(packet)
return ans
|