Package pyx12 :: Module x12file
[hide private]

Source Code for Module pyx12.x12file

  1  ###################################################################### 
  2  # Copyright (c) 2001-2005 Kalamazoo Community Mental Health Services, 
  3  #   John Holland <jholland@kazoocmh.org> <john@zoner.org> 
  4  # All rights reserved. 
  5  # 
  6  # This software is licensed as described in the file LICENSE.txt, which 
  7  # you should have received as part of this distribution.   
  8  # 
  9  ###################################################################### 
 10   
 11  #    $Id: x12file.py 1073 2007-05-03 21:06:10Z johnholland $ 
 12   
 13  """ 
 14  Interface to an X12 data file. 
 15  Efficiently handles large files. 
 16  Tracks end of explicit loops. 
 17  Tracks segment/line/loop counts. 
 18  """ 
 19   
 20  import sys 
 21  #import string 
 22  #from types import * 
 23  import logging 
 24  #import pdb 
 25   
 26  # Intrapackage imports 
 27  import pyx12.errors 
 28  import pyx12.segment 
 29   
 30  DEFAULT_BUFSIZE = 8*1024 
 31  ISA_LEN = 106 
 32   
 33  logger = logging.getLogger('pyx12.x12file') 
 34  #logger.setLevel(logging.DEBUG) 
 35  #logger.setLevel(logging.ERROR) 
 36   
37 -class X12file(object):
38 """ 39 Interface to an X12 data file 40 """ 41
42 - def __init__(self, src_file_obj):
43 """ 44 Initialize the file 45 46 @param src_file_obj: absolute path of source file or fd 47 @type src_file_obj: string or open file object 48 """ 49 try: 50 res = src_file_obj.closed 51 self.fd = src_file_obj 52 except AttributeError: 53 if src_file_obj == '-': 54 self.fd = sys.stdin 55 else: 56 self.fd = open(src_file_obj, 'U') 57 self.err_list = [] 58 self.loops = [] 59 self.hl_stack = [] 60 self.gs_count = 0 61 self.st_count = 0 62 self.hl_count = 0 63 self.seg_count = 0 64 self.cur_line = 0 65 self.buffer = None 66 self.isa_ids = [] 67 self.gs_ids = [] 68 self.st_ids = [] 69 self.isa_usage = None 70 #self.errors = [] 71 72 #self.logger = logging.getLogger('pyx12') 73 74 line = self.fd.read(ISA_LEN) 75 if line[:3] != 'ISA': 76 err_str = "First line does not begin with 'ISA': %s" % line[:3] 77 raise pyx12.errors.X12Error, err_str 78 if len(line) != ISA_LEN: 79 err_str = 'ISA line is only %i characters' % len(line) 80 raise pyx12.errors.X12Error, err_str 81 self.seg_term = line[-1] 82 self.ele_term = line[3] 83 self.subele_term = line[-2] 84 logger.debug('seg_term "%s" / ele_term "%s" / subele_term "%s"' % \ 85 (self.seg_term, self.ele_term, self.subele_term)) 86 87 self.buffer = line 88 self.buffer += self.fd.read(DEFAULT_BUFSIZE)
89
90 - def __del__(self):
91 try: 92 self.fd.close() 93 except: 94 pass
95
96 - def __iter__(self):
97 return self
98
99 - def next(self):
100 """ 101 Iterate over input file segments 102 """ 103 self.err_list = [] 104 #self.errors = [] 105 try: 106 while True: 107 if self.buffer.find(self.seg_term) == -1: # Need more data 108 self.buffer += self.fd.read(DEFAULT_BUFSIZE) 109 while True: 110 # Get first segment in buffer 111 (line, self.buffer) = self.buffer.split(self.seg_term, 1) 112 line = line.replace('\n','').replace('\r','') 113 if line != '': 114 break 115 # We have not yet incremented cur_line 116 if line[-1] == self.ele_term: 117 err_str = 'Segment contains trailing element terminators' 118 self._seg_error('SEG1', err_str, None, 119 src_line=self.cur_line+1) 120 #seg = string.split(line, self.ele_term) 121 seg = pyx12.segment.Segment(line, self.seg_term, self.ele_term, \ 122 self.subele_term) 123 if seg.is_empty(): 124 err_str = 'Segment "%s" is empty' % (line) 125 self._seg_error('8', err_str, None, 126 src_line=self.cur_line+1) 127 if not seg.is_seg_id_valid(): 128 err_str = 'Segment identifier "%s" is invalid' % ( 129 seg.get_seg_id()) 130 self._seg_error('1', err_str, None, 131 src_line=self.cur_line+1) 132 else: 133 break # Found valid segment, so can stop looking 134 except: 135 raise StopIteration 136 137 try: 138 #for i in xrange(1, len(seg)+1): 139 # if seg[i].find(self.subele_term) != -1: 140 # seg[i] = seg[i].split(self.subele_term) # Split composite 141 if seg.get_seg_id() == 'ISA': 142 if len(seg) != 16: 143 raise pyx12.errors.X12Error, \ 144 'The ISA segment must have 16 elements (%s)' % (seg) 145 #seg[-1] = self.subele_term 146 interchange_control_number = seg.get_value('ISA13') 147 if interchange_control_number in self.isa_ids: 148 err_str = 'ISA Interchange Control Number ' 149 err_str += '%s not unique within file' \ 150 % (interchange_control_number) 151 self._isa_error('025', err_str) 152 self.loops.append(('ISA', interchange_control_number)) 153 self.isa_ids.append(interchange_control_number) 154 self.gs_count = 0 155 self.gs_ids = [] 156 self.isa_usage = seg.get_value('ISA15') 157 elif seg.get_seg_id() == 'IEA': 158 if self.loops[-1][0] != 'ISA': 159 # Unterminated GS loop 160 err_str = 'Unterminated Loop %s' % (self.loops[-1][0]) 161 self._isa_error('024', err_str) 162 del self.loops[-1] 163 if self.loops[-1][1] != seg.get_value('IEA02'): 164 err_str = 'IEA id=%s does not match ISA id=%s' % \ 165 (seg.get_value('IEA02'), self.loops[-1][1]) 166 self._isa_error('001', err_str) 167 if self._int(seg.get_value('IEA01')) != self.gs_count: 168 err_str = 'IEA count for IEA02=%s is wrong' % \ 169 (seg.get_value('IEA02')) 170 self._isa_error('021', err_str) 171 del self.loops[-1] 172 elif seg.get_seg_id() == 'GS': 173 group_control_number = seg.get_value('GS06') 174 if group_control_number in self.gs_ids: 175 err_str = 'GS Interchange Control Number ' 176 err_str += '%s not unique within file' \ 177 % (group_control_number) 178 self._gs_error('6', err_str) 179 self.gs_count += 1 180 self.gs_ids.append(group_control_number) 181 self.loops.append(('GS', group_control_number)) 182 self.st_count = 0 183 self.st_ids = [] 184 elif seg.get_seg_id() == 'GE': 185 if self.loops[-1][0] != 'GS': 186 err_str = 'Unterminated segment %s' % (self.loops[-1][1]) 187 self._gs_error('3', err_str) 188 del self.loops[-1] 189 if self.loops[-1][1] != seg.get_value('GE02'): 190 err_str = 'GE id=%s does not match GS id=%s' % \ 191 (seg.get_value('GE02'), self.loops[-1][1]) 192 self._gs_error('4', err_str) 193 if self._int(seg.get_value('GE01')) != self.st_count: 194 err_str = 'GE count of %s for GE02=%s is wrong. I count %i'\ 195 % (seg.get_value('GE01'), \ 196 seg.get_value('GE02'), self.st_count) 197 self._gs_error('5', err_str) 198 del self.loops[-1] 199 elif seg.get_seg_id() == 'ST': 200 self.hl_stack = [] 201 self.hl_count = 0 202 transaction_control_number = seg.get_value('ST02') 203 if transaction_control_number in self.st_ids: 204 err_str = 'ST Interchange Control Number ' 205 err_str += '%s not unique within file' \ 206 % (transaction_control_number) 207 self._st_error('23', err_str) 208 self.st_count += 1 209 self.st_ids.append(transaction_control_number) 210 self.loops.append(('ST', transaction_control_number)) 211 self.seg_count = 1 212 self.hl_count = 0 213 elif seg.get_seg_id() == 'SE': 214 se_trn_control_num = seg.get_value('SE02') 215 if self.loops[-1][0] != 'ST' or \ 216 self.loops[-1][1] != se_trn_control_num: 217 err_str = 'SE id=%s does not match ST id=%s' % \ 218 (se_trn_control_num, self.loops[-1][1]) 219 self._st_error('3', err_str) 220 if self._int(seg.get_value('SE01')) != self.seg_count + 1: 221 err_str = 'SE count of %s for SE02=%s is wrong. I count %i'\ 222 % (seg.get_value('SE01'), \ 223 se_trn_control_num, self.seg_count + 1) 224 self._st_error('4', err_str) 225 del self.loops[-1] 226 elif seg.get_seg_id() == 'LS': 227 self.seg_count += 1 228 self.loops.append(('LS', seg.get_value('LS06'))) 229 elif seg.get_seg_id() == 'LE': 230 self.seg_count += 1 231 del self.loops[-1] 232 elif seg.get_seg_id() == 'HL': 233 self.seg_count += 1 234 self.hl_count += 1 235 hl_count = seg.get_value('HL01') 236 if self.hl_count != self._int(hl_count): 237 #raise pyx12.errors.X12Error, \ 238 # 'My HL count %i does not match your HL count %s' \ 239 # % (self.hl_count, seg[1]) 240 err_str = 'My HL count %i does not match your HL count %s' \ 241 % (self.hl_count, hl_count) 242 self._seg_error('HL1', err_str) 243 if seg.get_value('HL02') != '': 244 hl_parent = self._int(seg.get_value('HL02')) 245 if hl_parent not in self.hl_stack: 246 err_str = 'HL parent (%i) is not a valid parent' \ 247 % (hl_parent) 248 self._seg_error('HL2', err_str) 249 while self.hl_stack and hl_parent != self.hl_stack[-1]: 250 del self.hl_stack[-1] 251 else: 252 if len(self.hl_stack) != 0: 253 pass 254 #err_str = 'HL parent is blank, but stack not empty' 255 #self._seg_error('HL2', err_str) 256 self.hl_stack.append(self.hl_count) 257 else: 258 self.seg_count += 1 259 except IndexError: 260 raise 261 #err_str = "Expected element not found': %s" % seg.format() 262 #raise pyx12.errors.X12Error, err_str 263 264 self.cur_line += 1 265 return seg
266
267 - def get_errors(self):
268 """ 269 Get Errors 270 DEPRECATED 271 """ 272 raise pyx12.errors.EngineError, 'X12file.get_errors is no longer used'
273
274 - def pop_errors(self):
275 """ 276 Pop error list 277 @return: List of errors 278 """ 279 tmp = self.err_list 280 self.err_list = [] 281 return tmp
282
283 - def _isa_error(self, err_cde, err_str):
284 """ 285 @param err_cde: ISA level error code 286 @type err_cde: string 287 @param err_str: Description of the error 288 @type err_str: string 289 """ 290 self.err_list.append(('isa', err_cde, err_str, None, None))
291
292 - def _gs_error(self, err_cde, err_str):
293 """ 294 @param err_cde: GS level error code 295 @type err_cde: string 296 @param err_str: Description of the error 297 @type err_str: string 298 """ 299 self.err_list.append(('gs', err_cde, err_str, None, None))
300
301 - def _st_error(self, err_cde, err_str):
302 """ 303 @param err_cde: Segment level error code 304 @type err_cde: string 305 @param err_str: Description of the error 306 @type err_str: string 307 """ 308 self.err_list.append(('st', err_cde, err_str, None, None))
309
310 - def _seg_error(self, err_cde, err_str, err_value=None, src_line=None):
311 """ 312 @param err_cde: Segment level error code 313 @type err_cde: string 314 @param err_str: Description of the error 315 @type err_str: string 316 """ 317 self.err_list.append(('seg', err_cde, err_str, err_value, src_line))
318
319 - def _int(self, str_val):
320 """ 321 Converts a string to an integer 322 @type str_val: string 323 @return: Int value if successful, None if not 324 @rtype: int 325 """ 326 try: 327 return int(str_val) 328 except ValueError: 329 return None 330 return None
331
332 - def cleanup(self):
333 """ 334 At EOF, check for missing loop trailers 335 """ 336 #self.err_list = [] 337 if self.loops: 338 for (seg, id1) in self.loops: 339 if seg == 'ST': 340 err_str = 'Mandatory segment "Transaction Set Trailer" ' 341 err_str += '(SE=%s) missing' % (id1) 342 self._st_error('2', err_str) 343 elif seg == 'GS': 344 err_str = 'Mandatory segment "Functional Group Trailer" ' 345 err_str += '(GE=%s) missing' % (id1) 346 self._gs_error('3', err_str) 347 elif seg == 'ISA': 348 err_str = 'Mandatory segment "Interchange Control Trailer" ' 349 err_str += '(IEA=%s) missing' % (id1) 350 self._isa_error('023', err_str)
351 #elif self.loops[-1][0] == 'LS': 352 # err_str = 'LS id=%s was not closed with a LE' % \ 353 # (id1, self.loops[-1][1]) 354
355 - def get_isa_id(self):
356 """ 357 Get the current ISA identifier 358 359 @rtype: string 360 """ 361 for loop in self.loops: 362 if loop[0] == 'ISA': 363 return loop[1] 364 return None
365
366 - def get_gs_id(self):
367 """ 368 Get the current GS identifier 369 370 @rtype: string 371 """ 372 for loop in self.loops: 373 if loop[0] == 'GS': 374 return loop[1] 375 return None 376
377 - def get_st_id(self):
378 """ 379 Get the current ST identifier 380 381 @rtype: string 382 """ 383 for loop in self.loops: 384 if loop[0] == 'ST': 385 return loop[1] 386 return None 387
388 - def get_ls_id(self):
389 """ 390 Get the current LS identifier 391 392 @rtype: string 393 """ 394 for loop in self.loops: 395 if loop[0] == 'LS': 396 return loop[1] 397 return None 398
399 - def get_seg_count(self):
400 """ 401 Get the current segment count 402 403 @rtype: int 404 """ 405 return self.seg_count 406
407 - def get_cur_line(self):
408 """ 409 Get the current line 410 411 @rtype: int 412 """ 413 return self.cur_line
414
415 - def get_term(self):
416 """ 417 Get the original terminators 418 419 @rtype: tuple(string, string, string, string) 420 """ 421 return (self.seg_term, self.ele_term, self.subele_term, '\n')
422