Package pyx12 :: Module map_walker
[hide private]

Source Code for Module pyx12.map_walker

  1  ###################################################################### 
  2  # Copyright (c) 2001-2005 Kalamazoo Community Mental Health Services, 
  3  #   John Holland <jholland@kazoocmh.org> <john@zoner.org> 
  4  # All rights reserved. 
  5  # 
  6  # This software is licensed as described in the file LICENSE.txt, which 
  7  # you should have received as part of this distribution. 
  8  # 
  9  ###################################################################### 
 10   
 11  #    $Id: map_walker.py 1147 2007-08-29 04:41:53Z johnholland $ 
 12   
 13  """ 
 14  Walk a tree of x12_map nodes.  Find the correct node. 
 15   
 16  If seg indicates a loop has been entered, returns the first child segment node. 
 17  If seg indicates a segment has been entered, returns the segment node. 
 18  """ 
 19   
 20  import logging 
 21  import pdb 
 22   
 23  # Intrapackage imports 
 24  from errors import * 
 25  import pyx12.segment 
 26   
 27  logger = logging.getLogger('pyx12.walk_tree') 
 28  #logger.setLevel(logging.DEBUG) 
 29  #logger.setLevel(logging.ERROR) 
 30   
31 -def pop_to_parent_loop(node):
32 """ 33 @param node: Loop Node 34 @type node: L{node<map_if.x12_node>} 35 @return: Closest parent loop node 36 @rtype: L{node<map_if.x12_node>} 37 """ 38 if node.is_map_root(): 39 return node 40 map_node = node.parent 41 if map_node is None: 42 raise EngineError, "Node is None: %s" % (node.name) 43 while not (map_node.is_loop() or map_node.is_map_root()): 44 map_node = map_node.parent 45 if not (map_node.is_loop() or map_node.is_map_root()): 46 raise EngineError, "Called pop_to_parent_loop, can't find parent loop" 47 return map_node
48
49 -def is_first_seg_match2(child, seg_data):
50 """ 51 Find the first segment in loop, verify it matches segment 52 53 @param child: child node 54 @type child: L{node<map_if.x12_node>} 55 @param seg_data: Segment object 56 @type seg_data: L{segment<segment.Segment>} 57 @rtype: boolean 58 """ 59 if child.is_segment(): 60 if child.is_match(seg_data): 61 return True 62 else: 63 return False # seg does not match the first segment in loop, so not valid 64 return False
65 66
67 -class walk_tree(object):
68 """ 69 Walks a map_if tree. Tracks loop/segment counting, missing loop/segment. 70 """
71 - def __init__(self):
72 # end_tag_stack = [] 73 #self.cur_seg_count = 0 74 self.mandatory_segs_missing = [] # Store errors until we know we have an error 75 pass
76
77 - def walk(self, node, seg_data, errh, seg_count, cur_line, ls_id):
78 """ 79 Walk the node tree from the starting node to the node matching 80 seg_data. Catch any counting or requirement errors along the way. 81 82 Handle required segment/loop missed (not found in seg) 83 Handle found segment = Not used 84 85 @param node: Starting node 86 @type node: L{node<map_if.x12_node>} 87 @param seg_data: Segment object 88 @type seg_data: L{segment<segment.Segment>} 89 @param seg_count: Count of current segment in the ST Loop 90 @type seg_count: int 91 @param cur_line: Current line number in the file 92 @type cur_line: int 93 @param ls_id: The current LS loop identifier 94 @type ls_id: string 95 @return: The matching x12_node 96 @rtype: L{node<map_if.x12_node>} 97 98 @todo: check single segment loop repeat 99 """ 100 101 #if seg_data.get_seg_id() == 'LX': 102 # pdb.set_trace() 103 #logger.debug('start walk %s' % (node)) 104 orig_node = node 105 #logger.info('%s seg_count=%i / cur_line=%i' % (node.id, seg_count, cur_line)) 106 self.mandatory_segs_missing = [] 107 node_pos = node.pos # Get original position ordinal of starting node 108 if not (node.is_loop() or node.is_map_root()): 109 node = pop_to_parent_loop(node) # Get enclosing loop 110 while True: 111 #logger.debug('seg_data.id % ' % (seg_data.get_seg_id())) 112 # Iterate through nodes with position >= current position 113 pos_keys = filter(lambda a: a>= node_pos, node.pos_map.keys()) 114 pos_keys.sort() 115 for ord1 in pos_keys: 116 #for child in node.children: 117 #logger.debug('id=%s child.index=%i node_pos=%i' % \ 118 # (child.id, child.index, node_pos)) 119 for child in node.pos_map[ord1]: 120 #if child.pos >= node_pos: 121 if child.is_segment(): 122 #logger.debug('id=%s cur_count=%i max_repeat=%i' \ 123 # % (child.id, child.cur_count, child.get_max_repeat())) 124 if child.is_match(seg_data): 125 # Is the matched segment the beginning of a loop? 126 if node.is_loop() \ 127 and self._is_loop_match(node, seg_data, errh, seg_count, cur_line, ls_id): 128 node1 = self._goto_seg_match(node, seg_data, \ 129 errh, seg_count, cur_line, ls_id) 130 return node1 131 #return node1.get_child_node_by_idx(0) 132 child.incr_cur_count() 133 #logger.debug('MATCH segment %s (%s*%s)' % (child.id,\ 134 # seg_data.get_seg_id(), seg_data[0].get_value())) 135 if child.usage == 'N': 136 err_str = "Segment %s found but marked as not used" % (child.id) 137 errh.seg_error('2', err_str, None) 138 elif child.usage == 'R' or child.usage == 'S': 139 if child.get_cur_count() > child.get_max_repeat(): # handle seg repeat count 140 err_str = "Segment %s exceeded max count. Found %i, should have %i" \ 141 % (seg_data.get_seg_id(), child.get_cur_count(), child.get_max_repeat()) 142 errh.add_seg(child, seg_data, seg_count, cur_line, ls_id) 143 errh.seg_error('5', err_str, None) 144 else: 145 raise EngineError, 'Usage must be R, S, or N' 146 # Remove any previously missing errors for this segment 147 self.mandatory_segs_missing = filter(lambda x: x[0]!=child, 148 self.mandatory_segs_missing) 149 self._flush_mandatory_segs(errh, child.pos) 150 return child 151 elif child.usage == 'R' and child.get_cur_count() < 1: 152 fake_seg = pyx12.segment.Segment('%s'% (child.id), '~', '*', ':') 153 #errh.add_seg(child, fake_seg, seg_count, cur_line, ls_id) 154 err_str = 'Mandatory segment "%s" (%s) missing' % (child.name, child.id) 155 self.mandatory_segs_missing.append((child, fake_seg, 156 '3', err_str, seg_count, cur_line, ls_id)) 157 #else: 158 #logger.debug('Segment %s is not a match for (%s*%s)' % \ 159 # (child.id, seg_data.get_seg_id(), seg_data[0].get_value())) 160 elif child.is_loop(): 161 #logger.debug('child_node id=%s' % (child.id)) 162 if self._is_loop_match(child, seg_data, errh, \ 163 seg_count, cur_line, ls_id): 164 node_seg = self._goto_seg_match(child, seg_data, \ 165 errh, seg_count, cur_line, ls_id) 166 #node_seg = child.get_child_node_by_idx(0) 167 return node_seg 168 if node.is_map_root(): # If at root and we haven't found the segment yet. 169 self._seg_not_found(orig_node, seg_data, errh, seg_count, \ 170 cur_line, ls_id) 171 return None 172 node_pos = node.pos # Get position ordinal of current node in tree 173 node = pop_to_parent_loop(node) # Get enclosing parent loop 174 175 self._seg_not_found(orig_node, seg_data, errh, seg_count, cur_line, ls_id) 176 return None
177 178 # def _is_loop_repeat(self, node, seg_data): 179 # if not (node.is_loop() or node.is_map_root()): 180 # node = pop_to_parent_loop(node) # Get enclosing loop 181 # return self._is_first_seg_match(node, seg_data): 182
183 - def _seg_not_found(self, orig_node, seg_data, errh, seg_count, cur_line, ls_id):
184 """ 185 Create error for not found segments 186 187 @param orig_node: Original starting node 188 @type orig_node: L{node<map_if.x12_node>} 189 @param seg_data: Segment object 190 @type seg_data: L{segment<segment.Segment>} 191 @param errh: Error handler 192 @type errh: L{error_handler.err_handler} 193 """ 194 if seg_data.get_seg_id() == 'HL': 195 seg_str = seg_data.format('', '*', ':') 196 else: 197 seg_str = '%s*%s' % (seg_data.get_seg_id(), seg_data.get_value('01')) 198 err_str = 'Segment %s not found. Started at %s' % (seg_str, orig_node.get_path()) 199 errh.add_seg(orig_node, seg_data, seg_count, cur_line, ls_id) 200 errh.seg_error('1', err_str, None)
201 #raise EngineError, "Could not find segment %s*%s. Started at %s" % \ 202 # (seg.get_seg_id(), seg[1], orig_node.get_path()) 203 204 # def _is_first_seg_match(self, node, seg_data): 205 # """ 206 # Find the first segment in loop, verify it matches segment 207 # @rtype: boolean 208 # """ 209 # if not node.is_loop(): raise EngineError, \ 210 # "Call to first_seg_match failed, node is not a loop" 211 # child = node.get_child_node_by_idx(0) 212 # if child.is_segment(): 213 # if child.is_match(seg_data): 214 # return True 215 # else: 216 # return False # seg does not match the first segment in loop, so not valid 217 # return False 218
219 - def _flush_mandatory_segs(self, errh, cur_pos = None):
220 """ 221 Handle error reporting for any outstanding missing mandatory segments 222 223 @param errh: Error handler 224 @type errh: L{error_handler.err_handler} 225 """ 226 #if self.mandatory_segs_missing: pdb.set_trace() 227 for (seg_node, seg_data, err_cde, err_str, 228 seg_count, cur_line, ls_id) in self.mandatory_segs_missing: 229 # Create errors if not also at current position 230 if seg_node.pos != cur_pos: 231 errh.add_seg(seg_node, seg_data, seg_count, cur_line, ls_id) 232 errh.seg_error(err_cde, err_str, None) 233 self.mandatory_segs_missing = filter(lambda x: x[0].pos==cur_pos, 234 self.mandatory_segs_missing)
235 #self.mandatory_segs_missing = [] 236 237 #def _is_loop_match(self, loop_node, seg_data, errh, seg_count, cur_line, ls_id): 238 # if not loop_node.is_loop(): raise EngineError, \ 239 # "Call to first_seg_match failed, node %s is not a loop. seg %s" \ 240 # % (loop_node.id, seg_data.get_seg_id()) 241
242 - def _is_loop_match(self, loop_node, seg_data, errh, seg_count, cur_line, ls_id):
243 """ 244 Try to match the current loop to the segment 245 Handle loop and segment counting. 246 Check for used/missing 247 248 @param loop_node: Loop Node 249 @type loop_node: L{node<map_if.loop_if>} 250 @param seg_data: Segment object 251 @type seg_data: L{segment<segment.Segment>} 252 @param errh: Error handler 253 @type errh: L{error_handler.err_handler} 254 255 @return: Does the segment match the first segment node in the loop? 256 @rtype: boolean 257 """ 258 #if seg_data.get_seg_id() == 'HL': 259 # pdb.set_trace() 260 if not loop_node.is_loop(): raise EngineError, \ 261 "Call to first_seg_match failed, node %s is not a loop. seg %s" \ 262 % (loop_node.id, seg_data.get_seg_id()) 263 if len(loop_node) <= 0: # Has no children 264 return False 265 #first_child_node = loop_node.get_child_node_by_idx(0) 266 pos_keys = loop_node.pos_map.keys() 267 pos_keys.sort() 268 #min_pos = reduce(min, loop_node.pos_map.keys()) 269 first_child_node = loop_node.pos_map[pos_keys[0]][0] 270 if first_child_node.is_loop(): 271 #If any loop node matches 272 for ord1 in pos_keys: 273 for child_node in loop_node.pos_map[ord1]: 274 if child_node.is_loop() and self._is_loop_match(child_node, \ 275 seg_data, errh, seg_count, cur_line, ls_id): 276 return True 277 elif is_first_seg_match2(first_child_node, seg_data): 278 return True 279 elif loop_node.usage == 'R' and loop_node.get_cur_count() < 1: 280 #pdb.set_trace() 281 fake_seg = pyx12.segment.Segment('%s' % \ 282 (first_child_node.id), '~', '*', ':') 283 #errh.add_seg(first_child_node, fake_seg, seg_count, cur_line, ls_id) 284 err_str = 'Mandatory loop "%s" (%s) missing' % \ 285 (loop_node.name, loop_node.id) 286 self.mandatory_segs_missing.append((first_child_node, fake_seg, \ 287 '3', err_str, seg_count, cur_line, ls_id)) 288 #errh.seg_error('3', err_str, None) 289 return False
290
291 - def _goto_seg_match(self, loop_node, seg_data, errh, seg_count, cur_line, ls_id):
292 """ 293 A child loop has matched the segment. Return that segment node. 294 Handle loop counting and requirement errors. 295 296 @param loop_node: The starting loop node. 297 @type loop_node: L{node<map_if.loop_if>} 298 @param seg_data: Segment object 299 @type seg_data: L{segment<segment.Segment>} 300 @param errh: Error handler 301 @type errh: L{error_handler.err_handler} 302 @param seg_count: Current segment count for ST loop 303 @type seg_count: int 304 @param cur_line: File line counter 305 @type cur_line: int 306 @type ls_id: string 307 308 @return: The matching segment node 309 @rtype: L{node<map_if.segment_if>} 310 """ 311 if not loop_node.is_loop(): raise EngineError, \ 312 "_goto_seg_match failed, node %s is not a loop. seg %s" \ 313 % (loop_node.id, seg_data.get_seg_id()) 314 #first_child_node = loop_node.get_child_node_by_idx(0) 315 pos_keys = loop_node.pos_map.keys() 316 pos_keys.sort() 317 first_child_node = loop_node.pos_map[pos_keys[0]][0] 318 if is_first_seg_match2(first_child_node, seg_data): 319 if loop_node.usage == 'N': 320 err_str = "Loop %s found but marked as not used" % (loop_node.id) 321 errh.seg_error('2', err_str, None) 322 elif loop_node.usage in ('R', 'S'): 323 loop_node.reset_child_count() 324 loop_node.incr_cur_count() 325 #logger.debug('incr loop_node %s %i' % (loop_node.id, loop_node.cur_count)) 326 first_child_node.incr_cur_count() 327 #logger.debug('incr first_child_node %s %i' % (first_child_node.id, first_child_node.cur_count)) 328 if loop_node.get_cur_count() > loop_node.get_max_repeat(): 329 err_str = "Loop %s exceeded max count. Found %i, should have %i" \ 330 % (loop_node.id, loop_node.get_cur_count(), loop_node.get_max_repeat()) 331 errh.add_seg(loop_node, seg_data, seg_count, cur_line, ls_id) 332 errh.seg_error('4', err_str, None) 333 #logger.debug('MATCH Loop %s / Segment %s (%s*%s)' \ 334 # % (child.id, first_child_node.id, seg_data.get_seg_id(), seg[0].get_value())) 335 else: 336 raise EngineError, 'Usage must be R, S, or N' 337 self._flush_mandatory_segs(errh) 338 return first_child_node 339 else: 340 #for child in loop_node.children: 341 for ord1 in pos_keys: 342 for child in loop_node.pos_map[ord1]: 343 if child.is_loop(): 344 node1 = self._goto_seg_match(child, seg_data, errh, \ 345 seg_count, cur_line, ls_id) 346 if node1: 347 return node1 348 return None
349