Package pyx12 :: Module error_997
[hide private]

Source Code for Module pyx12.error_997

  1  ###################################################################### 
  2  # Copyright (c) 2001-2005 Kalamazoo Community Mental Health Services, 
  3  #   John Holland <jholland@kazoocmh.org> <john@zoner.org> 
  4  # All rights reserved. 
  5  # 
  6  # This software is licensed as described in the file LICENSE.txt, which 
  7  # you should have received as part of this distribution. 
  8  # 
  9  ###################################################################### 
 10   
 11  #    $Id: error_997.py 961 2007-03-23 21:35:20Z johnholland $ 
 12   
 13  """ 
 14  Generates a 997 Response 
 15  Visitor - Visits an error_handler composite 
 16  """ 
 17   
 18  #import os 
 19  #import sys 
 20  from types import * 
 21  import time 
 22  import logging 
 23  #import pdb 
 24   
 25  # Intrapackage imports 
 26  from errors import * 
 27  import error_visitor 
 28  import pyx12.segment 
 29   
 30  logger = logging.getLogger('pyx12.error_997') 
 31  logger.setLevel(logging.DEBUG) 
 32  #logger.setLevel(logging.ERROR) 
 33   
34 -class error_997_visitor(error_visitor.error_visitor):
35 """ 36 Visit an error_handler composite. Generate a 997. 37 """
38 - def __init__(self, fd, term=('~', '*', '~', '\n')):
39 """ 40 @param fd: target file 41 @type fd: file descriptor 42 @param term: tuple of x12 terminators used 43 @type term: tuple(string, string, string, string) 44 """ 45 self.fd = fd 46 self.seg_term = '~' 47 self.ele_term = '*' 48 self.subele_term = ':' 49 #self.seg_term = term[0] 50 #self.ele_term = term[1] 51 #self.subele_term = term[2] 52 #self.eol = term[3] 53 self.eol = '\n' 54 self.seg_count = 0 55 self.st_control_num = 0
56
57 - def visit_root_pre(self, errh):
58 """ 59 @param errh: Error handler 60 @type errh: L{error_handler.err_handler} 61 """ 62 #now = time.localtime() 63 seg = errh.cur_isa_node.seg_data 64 #ISA*00* *00* *ZZ*ENCOUNTER *ZZ*00GR *030425*1501*U*00401*000065350*0*T*:~ 65 self.isa_control_num = ('%s%s'%(time.strftime('%y%m%d'), time.strftime('%H%M')))[1:] 66 67 #logger.info('\n'+seg.format()) 68 #pdb.set_trace() 69 isa_seg = pyx12.segment.Segment('ISA*00* *00* ', \ 70 self.seg_term, self.ele_term, self.subele_term) 71 isa_seg.append(seg.get_value('ISA07')) 72 isa_seg.append(seg.get_value('ISA08')) 73 isa_seg.append(seg.get_value('ISA05')) 74 isa_seg.append(seg.get_value('ISA06')) 75 isa_seg.append(time.strftime('%y%m%d')) # Date 76 isa_seg.append(time.strftime('%H%M')) # Time 77 isa_seg.append(seg.get_value('ISA11')) 78 isa_seg.append(seg.get_value('ISA12')) 79 isa_seg.append(self.isa_control_num) # ISA Interchange Control Number 80 isa_seg.append(seg.get_value('ISA14')) 81 isa_seg.append(seg.get_value('ISA15')) 82 isa_seg.append(self.subele_term) 83 self._write(isa_seg) 84 self.isa_seg = isa_seg 85 self.gs_loop_count = 0 86 87 # GS*FA*ENCOUNTER*00GR*20030425*150153*653500001*X*004010 88 seg = errh.cur_gs_node.seg_data 89 gs_seg = pyx12.segment.Segment('GS', '~', '*', ':') 90 gs_seg.append('FA') 91 gs_seg.append(seg.get_value('GS03').rstrip()) 92 gs_seg.append(seg.get_value('GS02').rstrip()) 93 gs_seg.append(time.strftime('%Y%m%d')) 94 gs_seg.append(time.strftime('%H%M%S')) 95 gs_seg.append(seg.get_value('GS06')) 96 gs_seg.append(seg.get_value('GS07')) 97 gs_seg.append('004010') 98 self._write(gs_seg) 99 self.gs_seg = gs_seg 100 self.gs_id = seg.get_value('GS06') 101 #self.gs_997_count = 0 102 self.st_loop_count = 0 103 self.gs_loop_count += 1
104
105 - def __get_isa_errors(self, err_isa):
106 """ 107 Build list of TA1 level errors 108 Only the first error is used 109 """ 110 isa_ele_err_map = { 1:'010', 2:'011', 3:'012', 4:'013', 5:'005', 6:'006', 111 7:'007', 8:'008', 9:'014', 10:'015', 11:'016', 12:'017', 13:'018', 112 14:'019', 15:'020', 16:'027' 113 } 114 iea_ele_err_map = { 1:'021', 2:'018' } 115 err_codes = [err[0] for err in err_isa.errors] 116 for elem in err_isa.elements: 117 for (err_cde, err_str, bad_value) in elem.errors: 118 # Ugly 119 if 'ISA' in err_str: 120 err_codes.append(isa_ele_err_map[elem.ele_pos]) 121 elif 'IEA' in err_str: 122 err_codes.append(iea_ele_err_map[elem.ele_pos]) 123 # return unique codes 124 return list(set(err_codes))
125
126 - def visit_root_post(self, errh):
127 """ 128 @param errh: Error handler 129 @type errh: L{error_handler.err_handler} 130 """ 131 self._write(pyx12.segment.Segment('GE*%i*%s' % (self.st_loop_count, \ 132 self.gs_seg.get_value('GS06')), '~', '*', ':')) 133 self.gs_loop_count = 1 134 135 #pdb.set_trace() 136 #TA1 segment 137 err_isa = errh.cur_isa_node 138 if err_isa.ta1_req == '1': 139 #seg = ['TA1', err_isa.isa_trn_set_id, err_isa.orig_date, \ 140 # err_isa.orig_time] 141 ta1_seg = pyx12.segment.Segment('TA1', '~', '*', ':') 142 ta1_seg.append(err_isa.isa_trn_set_id) 143 ta1_seg.append(err_isa.orig_date) 144 ta1_seg.append(err_isa.orig_time) 145 err_codes = self.__get_isa_errors(err_isa) 146 if err_codes: 147 err_cde = err_codes[0] 148 ta1_seg.append('R') 149 ta1_seg.append(err_cde) 150 else: 151 ta1_seg.append('A') 152 ta1_seg.append('000') 153 self._write(ta1_seg) 154 155 self._write(pyx12.segment.Segment('IEA*%i*%s' % \ 156 (self.gs_loop_count, self.isa_control_num), '~', '*', ':'))
157
158 - def visit_isa_pre(self, err_isa):
159 """ 160 @param err_isa: ISA Loop error handler 161 @type err_isa: L{error_handler.err_isa} 162 """
163
164 - def visit_isa_post(self, err_isa):
165 """ 166 @param err_isa: ISA Loop error handler 167 @type err_isa: L{error_handler.err_isa} 168 """
169
170 - def visit_gs_pre(self, err_gs):
171 """ 172 @param err_gs: GS Loop error handler 173 @type err_gs: L{error_handler.err_gs} 174 """ 175 #ST 176 self.st_control_num += 1 177 #seg = ['ST', '997', '%04i' % self.st_control_num] 178 #self._write(seg) 179 self._write(pyx12.segment.Segment('ST*997*%04i' % \ 180 (self.st_control_num), '~', '*', ':')) 181 self.seg_count = 0 182 self.seg_count = 1 183 self.st_loop_count += 1 184 185 #AK1 186 #seg = ['AK1', err_gs.fic, err_gs.gs_control_num] 187 #self._write(seg) 188 self._write(pyx12.segment.Segment('AK1*%s*%s' % \ 189 (err_gs.fic, err_gs.gs_control_num), '~', '*', ':')) 190
191 - def __get_gs_errors(self, err_gs):
192 """ 193 Build list of GS level errors 194 """ 195 gs_ele_err_map = { 6:'6', 8:'2' } 196 ge_ele_err_map = { 2:'6' } 197 err_codes = [err[0] for err in err_gs.errors] 198 for elem in err_gs.elements: 199 for (err_cde, err_str, bad_value) in elem.errors: 200 # Ugly 201 if 'GS' in err_str: 202 if elem.ele_pos in gs_ele_err_map.keys(): 203 err_codes.append(gs_ele_err_map[elem.ele_pos]) 204 else: 205 err_codes.append('1') 206 elif 'GE' in err_str: 207 if elem.ele_pos in ge_ele_err_map.keys(): 208 err_codes.append(ge_ele_err_map[elem.ele_pos]) 209 else: 210 err_codes.append('1') 211 # return unique codes 212 ret = list(set(err_codes)) 213 ret.sort() 214 return ret
215
216 - def visit_gs_post(self, err_gs):
217 """ 218 @param err_gs: GS Loop error handler 219 @type err_gs: L{error_handler.err_gs} 220 """ 221 if not (err_gs.ack_code and err_gs.st_count_orig and \ 222 err_gs.st_count_recv): 223 #raise EngineError, 'err_gs variables not set' 224 if not err_gs.ack_code: 225 err_gs.ack_code = 'R' 226 if not err_gs.st_count_orig: 227 err_gs.st_count_orig = 0 228 if not err_gs.st_count_recv: 229 err_gs.st_count_recv = 0 230 #.st_count_orig = None # AK902 231 #.st_count_recv = None # AK903 232 #.st_count_accept = None # AK904 233 #.err_cde = [] # AK905-9 234 235 #seg.append(err_st.ack_code) 236 # if '1' in err_gs.errors: ack_code = 'R' 237 # elif '2' in err_gs.errors: ack_code = 'R' 238 # elif '3' in err_gs.errors: ack_code = 'R' 239 # elif '4' in err_gs.errors: ack_code = 'R' 240 # elif '5' in err_gs.errors: ack_code = 'E' 241 # elif '6' in err_gs.errors: ack_code = 'E' 242 # else: ack_code = 'A' 243 244 seg_data = pyx12.segment.Segment('AK9', '~', '*', ':') 245 seg_data.append(err_gs.ack_code) 246 seg_data.append('%i' % err_gs.st_count_orig) 247 seg_data.append('%i' % err_gs.st_count_recv) 248 count_ok = max(err_gs.st_count_recv - err_gs.count_failed_st(), 0) 249 seg_data.append('%i' % (count_ok)) 250 #seg = ['AK9', err_gs.ack_code, '%i' % err_gs.st_count_orig, \ 251 # '%i' % err_gs.st_count_recv, \ 252 # '%i' % (err_gs.st_count_recv - err_gs.count_failed_st())] 253 err_codes = self.__get_gs_errors(err_gs) 254 for err_cde in err_codes: 255 seg_data.append('%s' % err_cde) 256 #seg.append('%s' % err_cde) 257 self._write(seg_data) 258 #for child in err_gs.children: 259 #print child.cur_line, child.seg 260 #logger.info('err_gs has %i children' % len(self.children)) 261 262 #SE 263 seg_count = self.seg_count + 1 264 seg_data = pyx12.segment.Segment('SE', '~', '*', ':') 265 seg_data.append('%i' % seg_count) 266 seg_data.append('%04i' % self.st_control_num) 267 #seg = ['SE', '%i' % seg_count, '%04i' % self.st_control_num] 268 self._write(seg_data) 269
270 - def visit_st_pre(self, err_st):
271 """ 272 @param err_st: ST Loop error handler 273 @type err_st: L{error_handler.err_st} 274 """ 275 seg_data = pyx12.segment.Segment('AK2', '~', '*', ':') 276 seg_data.append(err_st.trn_set_id) 277 seg_data.append(err_st.trn_set_control_num.strip()) 278 self._write(seg_data)
279
280 - def __get_st_errors(self, err_st):
281 """ 282 Build list of ST level errors 283 """ 284 st_ele_err_map = { 1:'6', 2:'7' } 285 se_ele_err_map = { 1:'6', 2:'7' } 286 err_codes = [err[0] for err in err_st.errors] 287 if err_st.child_err_count() > 0: 288 err_codes.append('5') 289 for elem in err_st.elements: 290 for (err_cde, err_str, bad_value) in elem.errors: 291 # Ugly 292 if 'ST' in err_str: 293 err_codes.append(st_ele_err_map[elem.ele_pos]) 294 elif 'SE' in err_str: 295 err_codes.append(se_ele_err_map[elem.ele_pos]) 296 # return unique codes 297 ret = list(set(err_codes)) 298 ret.sort() 299 return ret
300
301 - def visit_st_post(self, err_st):
302 """ 303 @param err_st: ST Loop error handler 304 @type err_st: L{error_handler.err_st} 305 """ 306 if err_st.ack_code is None: 307 raise EngineError, 'err_st.ack_cde variable not set' 308 # self.ack_code = None # AK501 309 seg_data = pyx12.segment.Segment('AK5', '~', '*', ':') 310 # self.err_cde = [] # AK502-6 311 #seg.append(err_st.ack_code) 312 seg_data.append(err_st.ack_code) 313 err_codes = self.__get_st_errors(err_st) 314 for i in range(min(len(err_codes),5)): 315 seg_data.append(err_codes[i]) 316 #seg.append(err_codes[i]) 317 self._write(seg_data)
318
319 - def visit_seg(self, err_seg):
320 """ 321 @param err_seg: Segment error handler 322 @type err_seg: L{error_handler.err_seg} 323 """ 324 #logger.debug('visit_deg: AK3 - ') 325 #seg_base = ['AK3', err_seg.seg_id, '%i' % err_seg.seg_count] 326 valid_AK3_codes = ('1', '2', '3', '4', '5', '6', '7', '8') 327 seg_base = pyx12.segment.Segment('AK3', '~', '*', ':') 328 seg_base.append(err_seg.seg_id) 329 seg_base.append('%i' % err_seg.seg_count) 330 if err_seg.ls_id: 331 seg_base.append(err_seg.ls_id) 332 else: 333 seg_base.append('') 334 seg_str = seg_base.format('~', '*', ':') 335 errors = [x[0] for x in err_seg.errors] 336 if 'SEG1' in errors: 337 if '8' not in errors: 338 errors.append('8') 339 errors = filter(lambda x: x!='SEG1', errors) 340 for err_cde in list(set(errors)): 341 if err_cde in valid_AK3_codes: # unique codes 342 seg_data = pyx12.segment.Segment(seg_str, '~', '*', ':') 343 seg_data.set('AK304', err_cde) 344 self._write(seg_data) 345 if err_seg.child_err_count() > 0 and '8' not in errors: 346 seg_data = pyx12.segment.Segment(seg_str, '~', '*', ':') 347 seg_data.set('AK304', '8') 348 self._write(seg_data)
349
350 - def visit_ele(self, err_ele):
351 """ 352 @param err_ele: Segment error handler 353 @type err_ele: L{error_handler.err_ele} 354 """ 355 valid_AK4_codes = ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10') 356 seg_base = pyx12.segment.Segment('AK4', '~', '*', ':') 357 if err_ele.subele_pos: 358 seg_base.append('%i:%i' % (err_ele.ele_pos, err_ele.subele_pos)) 359 else: 360 seg_base.append('%i' % (err_ele.ele_pos)) 361 if err_ele.ele_ref_num: 362 seg_base.append(err_ele.ele_ref_num) 363 #else: 364 # seg_base.append('') 365 seg_str = seg_base.format('~', '*', ':') 366 for (err_cde, err_str, bad_value) in err_ele.errors: 367 if err_cde in valid_AK4_codes: 368 seg_data = pyx12.segment.Segment(seg_str, '~', '*', ':') 369 seg_data.set('AK403', err_cde) 370 if bad_value: 371 seg_data.set('AK404', bad_value) 372 self._write(seg_data) 373
374 - def _write(self, seg_data):
375 """ 376 Params: seg_data - 377 @param seg_data: Data segment instance 378 @type seg_data: L{segment.Segment} 379 """ 380 sout = seg_data.format(self.seg_term, self.ele_term, self.subele_term) 381 if seg_data.get_seg_id() == 'ISA': 382 sout = sout[:-1] + self.ele_term + self.subele_term \ 383 + self.seg_term 384 self.fd.write('%s\n' % (sout)) 385 self.seg_count += 1
386