Package pyx12 :: Module errh_xml
[hide private]

Source Code for Module pyx12.errh_xml

  1  ###################################################################### 
  2  # Copyright (c) 2001-2005 Kalamazoo Community Mental Health Services, 
  3  #   John Holland <jholland@kazoocmh.org> <john@zoner.org> 
  4  # All rights reserved. 
  5  # 
  6  # This software is licensed as described in the file LICENSE.txt, which 
  7  # you should have received as part of this distribution. 
  8  # 
  9  ###################################################################### 
 10   
 11  #    $Id: errh_xml.py 961 2007-03-23 21:35:20Z johnholland $ 
 12   
 13  """ 
 14  Capture X12 Errors 
 15  """ 
 16   
 17  import logging 
 18  from types import * 
 19  #import pdb 
 20  import tempfile 
 21  #import lxml 
 22  import os 
 23   
 24  # Intrapackage imports 
 25  from errors import * 
 26  from xmlwriter import XMLWriter 
 27   
 28  #class error_node: 
 29  #    def __init__(self) 
 30   
 31  logger = logging.getLogger('pyx12.errh_xml') 
 32  #logger.setLevel(logging.DEBUG) 
 33  #logger.setLevel(logging.ERROR) 
 34   
35 -class err_handler(object):
36 """ 37 The interface to the error handling structures. 38 """
39 - def __init__(self, xml_out=None, basedir=None):
40 """ 41 @param xml_out: Output filename, if None, will dump to tempfile 42 @param basedir: working directory, where file will be created 43 """ 44 if xml_out: 45 self.filename = xml_out 46 fd = open(xml_out, 'w') 47 else: 48 try: 49 (fdesc, self.filename) = tempfile.mkstemp('.xml', 'pyx12_') 50 fd = os.fdopen(fdesc, 'w+b') 51 #fd = tempfile.NamedTemporaryFile() 52 #self.filename = fd.name 53 except: 54 #self.filename = '997.tmp.xml' 55 (fdesc, self.filename) = tempfile.mkstemp(suffix='.xml', prefix='pyx12_', dir=basedir) 56 fd = os.fdopen(fdesc, 'w+b') 57 #fd = file(os.path.join(basedir, self.filename), 'w') 58 self.cur_line = None 59 self.errors = [] 60 if not fd: 61 raise EngineError, 'Could not open temp error xml file' 62 self.writer = XMLWriter(fd) 63 #self.writer.doctype( 64 # u"x12simple", u"-//J Holland//DTD XML X12 Document Conversion1.0//EN//XML", 65 # u"%s" % (dtd_urn)) 66 self.writer.push(u"x12err")
67
68 - def __del__(self):
69 while len(self.writer) > 0: 70 self.writer.pop()
71
72 - def getFilename(self):
73 return self.filename
74
75 - def handleErrors(self, err_list):
76 """ 77 @param err_list: list of errors to apply 78 """ 79 self.errors.extend(err_list)
80 #for (err_type, err_cde, err_str, err_val, src_line) in err_list: 81 # if err_type == 'isa': 82 # self.isa_error(err_cde, err_str) 83 # elif err_type == 'gs': 84 # self.gs_error(err_cde, err_str) 85 # elif err_type == 'st': 86 # self.st_error(err_cde, err_str) 87 # elif err_type == 'seg': 88 # self.seg_error(err_cde, err_str, err_val, src_line) 89
90 - def getCurLine(self):
91 """ 92 @return: Current file line number 93 @rtype: int 94 """ 95 return self.cur_line
96
97 - def Write(self, cur_line):
98 """ 99 Generate XML for the segment data and matching map node 100 101 """ 102 if len(self.errors) > 0: 103 self.writer.push(u"seg", attrs={u'line': '%i'%(cur_line)}) 104 for (err_type, err_cde, err_str, err_val, src_line) in self.errors: 105 self.writer.push(u"err", attrs={u"code": err_cde}) 106 #self.writer.elem(u"type", err_type) 107 #self.writer.elem(u"code", err_cde) 108 self.writer.elem(u"desc", err_str) 109 if err_val: 110 self.writer.elem(u"errval", err_val) 111 #self.writer.push(u"seg", {u'line': '%i'%(cur_line)}) 112 #self.writer.elem(u'ele', seg_data.get_value('%02i' % (i+1)), 113 # attrs={u'id': child_node.id}) 114 self.writer.pop() #end err 115 self.writer.pop() #end segment 116 self.errors = []
117
118 -class ErrorErrhNull(Exception):
119 """Class for errh_null errors."""
120
121 -class errh_list(object):
122 """ 123 A null error object - used for testing. 124 Stores the current error in simple variables. 125 """
126 - def __init__(self):
127 #self.id = 'ROOT' 128 self.errors = [] 129 #self.cur_node = self 130 self.cur_line = 0
131 #self.err_cde = None 132 #self.err_str = None 133
134 - def get_errors(self):
135 return self.errors
136
137 - def reset(self):
138 self.errors = []
139
140 - def get_cur_line(self):
141 """ 142 @return: Current file line number 143 @rtype: int 144 """ 145 return self.cur_line
146
147 - def set_cur_line(self, cur_line):
148 """ 149 """ 150 self.cur_line = cur_line
151 152 # def get_id(self): 153 # """ 154 # @return: Error node type 155 # @rtype: string 156 # """ 157 # return self.id 158
159 - def add_isa_loop(self, seg, src):
160 """ 161 """ 162 #raise ErrorErrhNull, 'add_isa loop' 163 pass
164
165 - def add_gs_loop(self, seg, src):
166 """ 167 """ 168 pass
169
170 - def add_st_loop(self, seg, src):
171 """ 172 """ 173 pass
174
175 - def add_seg(self, map_node, seg, seg_count, cur_line, ls_id):
176 """ 177 """ 178 pass
179
180 - def add_ele(self, map_node):
181 """ 182 """ 183 pass
184
185 - def isa_error(self, err_cde, err_str):
186 """ 187 @param err_cde: ISA level error code 188 @type err_cde: string 189 @param err_str: Description of the error 190 @type err_str: string 191 """ 192 self.errors.append(('isa', err_cde, err_str, None, None)) 193 sout = '' 194 sout += 'Line:%i ' % (self.cur_line) 195 sout += 'ISA:%s - %s' % (err_cde, err_str) 196 logger.error(sout)
197
198 - def gs_error(self, err_cde, err_str):
199 """ 200 @param err_cde: GS level error code 201 @type err_cde: string 202 @param err_str: Description of the error 203 @type err_str: string 204 """ 205 self.errors.append(('gs', err_cde, err_str, None, None)) 206 sout = '' 207 sout += 'Line:%i ' % (self.cur_line) 208 sout += 'GS:%s - %s' % (err_cde, err_str) 209 logger.error(sout)
210
211 - def st_error(self, err_cde, err_str):
212 """ 213 @param err_cde: Segment level error code 214 @type err_cde: string 215 @param err_str: Description of the error 216 @type err_str: string 217 """ 218 self.errors.append(('st', err_cde, err_str, None, None)) 219 sout = '' 220 sout += 'Line:%i ' % (self.cur_line) 221 sout += 'ST:%s - %s' % (err_cde, err_str) 222 logger.error(sout)
223
224 - def seg_error(self, err_cde, err_str, err_value=None, src_line=None):
225 """ 226 @param err_cde: Segment level error code 227 @type err_cde: string 228 @param err_str: Description of the error 229 @type err_str: string 230 """ 231 self.errors.append(('seg', err_cde, err_str, err_value, src_line)) 232 sout = '' 233 sout += 'Line:%i ' % (self.cur_line) 234 sout += 'SEG:%s - %s' % (err_cde, err_str) 235 if err_value: 236 sout += ' (%s)' % err_value 237 logger.error(sout)
238
239 - def ele_error(self, err_cde, err_str, bad_value):
240 """ 241 @param err_cde: Element level error code 242 @type err_cde: string 243 @param err_str: Description of the error 244 @type err_str: string 245 """ 246 self.errors.append(('ele', err_cde, err_str, bad_value, None)) 247 sout = '' 248 sout += 'Line:%i ' % (self.cur_line) 249 sout += 'ELE:%s - %s' % (err_cde, err_str) 250 if bad_value: 251 sout += ' (%s)' % (bad_value) 252 logger.error(sout)
253
254 - def close_isa_loop(self, node, seg, src):
255 """ 256 """ 257 pass
258
259 - def close_gs_loop(self, node, seg, src):
260 """ 261 """ 262 pass
263
264 - def close_st_loop(self, node, seg, src):
265 """ 266 """ 267 pass
268
269 - def find_node(self, type):
270 """ 271 Find the last node of a type 272 """ 273 pass
274
275 - def get_parent(self):
276 return None
277 278 # def get_first_child(self): 279 # """ 280 # """ 281 # if len(self.children) > 0: 282 # return self.children[0] 283 # else: 284 # return None 285
286 - def get_next_sibling(self):
287 """ 288 """ 289 return None
290
291 - def get_error_count(self):
292 """ 293 """ 294 return len(self.errors)
295
296 - def is_closed(self):
297 """ 298 @rtype: boolean 299 """ 300 return True
301