Package pyx12 :: Module x12n_document
[hide private]

Source Code for Module pyx12.x12n_document

  1  ##################################################################### 
  2  # Copyright (c) 2001-2005 Kalamazoo Community Mental Health Services, 
  3  #   John Holland <jholland@kazoocmh.org> <john@zoner.org> 
  4  # All rights reserved. 
  5  # 
  6  # This software is licensed as described in the file LICENSE.txt, which 
  7  # you should have received as part of this distribution. 
  8  # 
  9  ###################################################################### 
 10   
 11  # 
 12  #    $Id: x12n_document.py 1149 2007-08-29 15:10:30Z johnholland $ 
 13   
 14  """ 
 15  Parse a ANSI X12N data file.  Validate against a map and codeset values. 
 16  Create XML, HTML, and 997 documents based on the data file. 
 17  """ 
 18   
 19  import os, os.path, sys 
 20  import logging 
 21  #from types import * 
 22   
 23  # Intrapackage imports 
 24  import pyx12 
 25  import error_handler 
 26  import error_997 
 27  import error_debug 
 28  import error_html 
 29  #import errh_xml 
 30  import errors 
 31  import map_index 
 32  import map_if 
 33  import x12file 
 34  from map_walker import walk_tree 
 35  #from params import params 
 36   
37 -def apply_loop_count(orig_node, new_map):
38 """ 39 Apply loop counts to current map 40 """ 41 logger = logging.getLogger('pyx12') 42 ct_list = [] 43 orig_node.get_counts_list(ct_list) 44 for (path, ct) in ct_list: 45 try: 46 curnode = new_map.getnodebypath(path) 47 curnode.set_cur_count(ct) 48 except errors.EngineError: 49 logger.error('getnodebypath failed: path "%s" not found' % path)
50
51 -def reset_isa_counts(cur_map):
52 cur_map.getnodebypath('/ISA_LOOP').set_cur_count(1) 53 cur_map.getnodebypath('/ISA_LOOP/ISA').set_cur_count(1)
54
55 -def reset_gs_counts(cur_map):
56 cur_map.getnodebypath('/ISA_LOOP/GS_LOOP').reset_cur_count() 57 cur_map.getnodebypath('/ISA_LOOP/GS_LOOP').set_cur_count(1) 58 cur_map.getnodebypath('/ISA_LOOP/GS_LOOP/GS').set_cur_count(1)
59
60 -def x12n_document(param, src_file, fd_997, fd_html, 61 fd_xmldoc=None, 62 xslt_files = []):
63 """ 64 Primary X12 validation function 65 @param param: pyx12.param instance 66 @param src_file: Source document 67 @type src_file: string 68 @param fd_997: 997 output document 69 @type fd_997: file descriptor 70 @param fd_html: HTML output document 71 @type fd_html: file descriptor 72 @param fd_xmldoc: XML output document 73 @type fd_xmldoc: file descriptor 74 @rtype: boolean 75 """ 76 map_path = param.get('map_path') 77 logger = logging.getLogger('pyx12') 78 logger.debug('MAP PATH: %s' % (map_path)) 79 errh = error_handler.err_handler() 80 #errh = errh_xml.errh_list() 81 #errh.register() 82 #param.set('checkdate', None) 83 84 # Get X12 DATA file 85 try: 86 src = x12file.X12file(src_file) 87 except pyx12.errors.X12Error: 88 logger.error('"%s" does not look like an X12 data file' % (src_file)) 89 return False 90 91 #Get Map of Control Segments 92 map_file = 'x12.control.00401.xml' 93 control_map = map_if.load_map_file(os.path.join(map_path, map_file), param) 94 map_index_if = map_index.map_index(os.path.join(map_path, 'maps.xml')) 95 node = control_map.getnodebypath('/ISA_LOOP/ISA') 96 walker = walk_tree() 97 icvn = fic = vriic = tspc = None 98 #XXX Generate TA1 if needed. 99 100 if fd_html: 101 html = error_html.error_html(errh, fd_html, src.get_term()) 102 html.header() 103 err_iter = error_handler.err_iter(errh) 104 if fd_xmldoc: 105 logger.debug('xmlout: %s' % (param.get('xmlout'))) 106 if param.get('xmlout') == 'simple': 107 import x12xml_simple 108 xmldoc = x12xml_simple.x12xml_simple(fd_xmldoc, 109 param.get('simple_dtd')) 110 elif param.get('xmlout') == 'idtag': 111 import x12xml_idtag 112 xmldoc = x12xml_idtag.x12xml_idtag(fd_xmldoc, 113 param.get('idtag_dtd')) 114 elif param.get('xmlout') == 'idtagqual': 115 import x12xml_idtagqual 116 xmldoc = x12xml_idtagqual.x12xml_idtagqual(fd_xmldoc, 117 param.get('idtagqual_dtd')) 118 else: 119 xmldoc = x12xml_simple.x12xml_simple(fd_xmldoc, 120 param.get('simple_dtd')) 121 122 #basedir = os.path.dirname(src_file) 123 #erx = errh_xml.err_handler(basedir=basedir) 124 125 valid = True 126 for seg in src: 127 #find node 128 orig_node = node 129 130 if seg.get_seg_id() == 'ISA': 131 node = control_map.getnodebypath('/ISA_LOOP/ISA') 132 elif seg.get_seg_id() == 'GS': 133 node = control_map.getnodebypath('/ISA_LOOP/GS_LOOP/GS') 134 else: 135 try: 136 node = walker.walk(node, seg, errh, src.get_seg_count(), \ 137 src.get_cur_line(), src.get_ls_id()) 138 except errors.EngineError: 139 logger.error('Source file line %i' % (src.get_cur_line())) 140 raise 141 if node is None: 142 node = orig_node 143 else: 144 if seg.get_seg_id() == 'ISA': 145 errh.add_isa_loop(seg, src) 146 icvn = seg.get_value('ISA12') 147 errh.handle_errors(src.pop_errors()) 148 elif seg.get_seg_id() == 'IEA': 149 errh.handle_errors(src.pop_errors()) 150 errh.close_isa_loop(node, seg, src) 151 # Generate 997 152 #XXX Generate TA1 if needed. 153 elif seg.get_seg_id() == 'GS': 154 fic = seg.get_value('GS01') 155 vriic = seg.get_value('GS08') 156 map_file_new = map_index_if.get_filename(icvn, vriic, fic) 157 if map_file != map_file_new: 158 map_file = map_file_new 159 if map_file is None: 160 raise pyx12.errors.EngineError, "Map not found. icvn=%s, fic=%s, vriic=%s" % \ 161 (icvn, fic, vriic) 162 cur_map = map_if.load_map_file(map_file, param, xslt_files) 163 logger.debug('Map file: %s' % (map_file)) 164 apply_loop_count(orig_node, cur_map) 165 reset_isa_counts(cur_map) 166 reset_gs_counts(cur_map) 167 node = cur_map.getnodebypath('/ISA_LOOP/GS_LOOP/GS') 168 errh.add_gs_loop(seg, src) 169 errh.handle_errors(src.pop_errors()) 170 elif seg.get_seg_id() == 'BHT': 171 if vriic in ('004010X094', '004010X094A1'): 172 tspc = seg.get_value('BHT02') 173 logger.debug('icvn=%s, fic=%s, vriic=%s, tspc=%s' % \ 174 (icvn, fic, vriic, tspc)) 175 map_file_new = map_index_if.get_filename(icvn, vriic, fic, tspc) 176 logger.debug('New map file: %s' % (map_file_new)) 177 if map_file != map_file_new: 178 map_file = map_file_new 179 if map_file is None: 180 raise pyx12.errors.EngineError, "Map not found. icvn=%s, fic=%s, vriic=%s, tspc=%s" % \ 181 (icvn, fic, vriic, tspc) 182 cur_map = map_if.load_map_file(map_file, param, xslt_files) 183 logger.debug('Map file: %s' % (map_file)) 184 apply_loop_count(node, cur_map) 185 node = cur_map.getnodebypath('/ISA_LOOP/GS_LOOP/ST_LOOP/HEADER/BHT') 186 errh.add_seg(node, seg, src.get_seg_count(), \ 187 src.get_cur_line(), src.get_ls_id()) 188 errh.handle_errors(src.pop_errors()) 189 elif seg.get_seg_id() == 'GE': 190 errh.handle_errors(src.pop_errors()) 191 errh.close_gs_loop(node, seg, src) 192 elif seg.get_seg_id() == 'ST': 193 errh.add_st_loop(seg, src) 194 errh.handle_errors(src.pop_errors()) 195 elif seg.get_seg_id() == 'SE': 196 errh.handle_errors(src.pop_errors()) 197 errh.close_st_loop(node, seg, src) 198 else: 199 errh.add_seg(node, seg, src.get_seg_count(), \ 200 src.get_cur_line(), src.get_ls_id()) 201 errh.handle_errors(src.pop_errors()) 202 203 #errh.set_cur_line(src.get_cur_line()) 204 valid &= node.is_valid(seg, errh) 205 #erx.handleErrors(src.pop_errors()) 206 #erx.handleErrors(errh.get_errors()) 207 #errh.reset() 208 209 if fd_html: 210 if node is not None and node.is_first_seg_in_loop(): 211 html.loop(node.get_parent()) 212 err_node_list = [] 213 while True: 214 try: 215 err_iter.next() 216 err_node = err_iter.get_cur_node() 217 err_node_list.append(err_node) 218 except pyx12.errors.IterOutOfBounds: 219 break 220 html.gen_seg(seg, src, err_node_list) 221 222 if fd_xmldoc: 223 xmldoc.seg(node, seg) 224 225 #erx.Write(src.cur_line) 226 227 #erx.handleErrors(src.pop_errors()) 228 src.cleanup() #Catch any skipped loop trailers 229 errh.handle_errors(src.pop_errors()) 230 #erx.handleErrors(src.pop_errors()) 231 #erx.handleErrors(errh.get_errors()) 232 233 if fd_html: 234 html.footer() 235 del html 236 237 if fd_xmldoc: 238 del xmldoc 239 240 #visit_debug = error_debug.error_debug_visitor(sys.stdout) 241 #errh.accept(visit_debug) 242 243 #If this transaction is not a 997, generate one. 244 if not (vriic=='004010' and fic=='FA'): 245 if fd_997: 246 visit_997 = error_997.error_997_visitor(fd_997, src.get_term()) 247 errh.accept(visit_997) 248 del visit_997 249 del node 250 del src 251 del control_map 252 del cur_map 253 try: 254 if not valid or errh.get_error_count() > 0: 255 return False 256 else: 257 return True 258 except: 259 print errh 260 return False
261