#!/usr/bin/env python # (C) 2008 Jacob Joseph # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # Nov 29, 2007 # An attempt at reading the gas meter dials import os, time, copy, shutil, Queue import Image, ImageFilter, ImageEnhance, ImageDraw, ImageOps class capture: gauge_box = None # bounding box of the gauge panel dial_centers = None # Dictionary keyed by dial unit, of dial center coordinates dial_radius = None # Radius in pixels of a dial dev = None # '/dev/video0' tmpfile = None # temporary location for captured jpeg debug = None # print a bit more information debug_nocap = None # If true, no new images will be acquired debug_logcappath = None # write all captured images to this path full = None # full image dials = None # image of all dials # current reading cur_time = None # unix time of most recent reading dial_values = None # current gauge values dial_angles = None # current gauge angles reading = None # The final value, as one float # a past reading, with difference great enough to avoid bouncing prev_values = None rot_count_adj = None # rotation counts, adjusted to assure # non-decreasing values rot_count = None # rotation counts, never calibrated. for rate cumm_time = None # last cumulative reset cumm_count = None # cumulative rotation count def reset_cumm_count(self): self.cumm_time = self.cur_time self.cumm_count = {0.5: 0, 2: 0} def get_cumm_count(self): cnt_tmp = copy.copy(self.cumm_count) self.reset_cumm_count() return cnt_tmp def __init__(self, gauge_box, dial_centers, dial_radius, dev='/dev/video0', tempfile='temp.jpeg', debug = False, debug_nocap = False, debug_logcappath = None): self.gauge_box = gauge_box self.dial_centers = dial_centers self.dial_radius = dial_radius self.dev = dev self.tempfile = tempfile self.debug = debug self.debug_nocap = debug_nocap self.debug_logcappath = debug_logcappath def get_reading(self): """Snap a picture and process a new reading. Returns (value,""" self.snap() self.reading = 0 self.cur_time = time.time() da = self.dial_angles = {} cv = self.dial_values = {} self.dials.save('0_dials.jpeg') dr = self.dial_radius for (factor, (dx,dy,rot)) in dial_centers.items(): # crop and enhance the dial image # #FIXME: better thresholding techniques should certainly be #used, not to mention proper registration of the dial #coordiantes. For now, the camera is well secured, and #lighting constant. im_dial = self.dials.crop( (dx-dr, dy-dr, dx+dr, dy+dr)) #im_dial.save('1_crop_%s.jpeg' % factor) im_dial = ImageOps.equalize(im_dial) #im_dial.save('2_eq_%s.jpeg' % factor) im_dial = ImageOps.grayscale(im_dial).point(lambda i: 0 if i < 60 else 255) #im_dial.save('3_thresh_%s.jpeg' % factor) im_dial = im_dial.filter(ImageFilter.ModeFilter) #im_dial.save('4_filter_%s.jpeg' % factor) im_dial = ImageOps.invert(im_dial) #im_dial.save('5_invert_%s.jpeg' % factor) (angle, arcsize) = self.findangle_pie(im_dial, arcsize=15) da[factor] = angle + float(arcsize)/2 cv[factor] = self.angle_to_float( da[factor], rot) if self.debug: print "Factor angle: ", factor, angle self.show_pie(im_dial, angle, arcsize) # process the raw values self.count_rotations() self.disambiguate_dials() # compute the final value # factors are per revolution, so divide by ten for factor in (1000000, 100000, 10000, 1000): self.reading += factor/10 * int(self.prev_values[factor]) self.reading += 2 * (self.rot_count_adj[2] % 50) + (cv[2] * 0.2) #value += 0.5 * (self.cnt_half % 4) if self.debug: self.show_overlay() return (self.cur_time, self.reading) def disambiguate_dials(self): "Reconcile ambiguous dial values with finer dial readings." cv = self.dial_values pv = self.prev_values # disambiguate the 2ft dial using the half count if int(cv[2] + 0.2) > int(cv[2]) and (self.rot_count_adj[0.5] % 4) > 1: cv[2] = (int(cv[2]) + 1) % 10 else: cv[2] = int(cv[2]) # if close to the next value, rely upon the next dial to # disambiguate only really applies to the upper row. Increment # only once for factor in (1000, 10000, 100000, 1000000): # increased by >1, or passed zero if int(cv[factor]) > int(pv[factor]) or ( int(pv[factor] > 5) and int(cv[factor]) < 5): # reset the counters, if on the 1000 dial if factor==1000: pv[factor] = cv[factor] self.rot_count_adj[2] += 50 - (self.rot_count_adj[2] % 50) self.rot_count_adj[0.5] += 4 - (self.rot_count_adj[0.5] % 4) # increment only if near a digit, and the next smaller # dial is at a low value. # FIXME: just update whenever the previous dial wraps elif pv[factor/10] < 5: pv[factor] = cv[factor] elif cv[factor] > (pv[factor] + 0.5) % 10: pv[factor] = cv[factor] return def count_rotations(self): """Count rotations of the half and two foot dials. Also, synchronize with the 1000ft dial when it is near a digit.""" # Note that if more than about 15 seconds have passed, the # half gauge may have looped. This is not currently # considered t = self.cur_time cv = self.dial_values pv = self.prev_values # if no previous value, store the current and move on if self.prev_values is None: self.rot_count_adj = { 0.5: 0, 2: 0} self.rot_count = { 0.5: 0, 2: 0} self.reset_cumm_count() self.prev_values = copy.copy(cv) cv[1000] = int(cv[1000]) cv[2] = int(cv[2]) return # increment these counts if they passed zero for factor in (0.5, 2): # passed zero? if int(pv[factor]) > 5 and int(cv[factor]) < 5: self.rot_count_adj[factor] += 1 self.rot_count[factor] += 1 self.cumm_count[factor] += 1 self.prev_values[factor] = cv[factor] # if more than 0.5 past last, update. 0.5 should be greater than any noise elif cv[factor] > (pv[factor] + 0.5) % 10: pv[factor] = cv[factor] return def snap(self, nocapture=False): """Use v4lctl to capture a new image to disk.""" self.cur_time = time.time() if not self.debug_nocap: os.system( "v4lctl -c %s snap jpeg full %s" % (self.dev, self.tempfile)) if self.debug_logcappath is not None: shutil.copyfile( self.tempfile, self.debug_logcappath + '/' + str(int(self.cur_time)) + '.jpeg') self.full = Image.open(self.tempfile) #self.full = self.full.rotate(-90) self.dials = self.full.crop(self.gauge_box) return def angle_to_float(self, angle, rot): """Convert a gauge angle to a float. rot is either 'cw' or 'ccw'.""" if rot == 'ccw': return (-float(angle) / 36 + 7.5) % 10 elif rot == 'cw': return (float(angle) / 36 + 2.5) % 10 def show_pie(self, img, angle, arcsize): """Draw a pie slice at the center of an image.""" print "Drawing Angle: %d - %d" % (angle, angle+arcsize) imgc = img.copy() imgc = imgc.convert("RGB") draw = ImageDraw.Draw(imgc) draw.pieslice( (0,0,imgc.size[0],imgc.size[1]), angle, angle+arcsize, outline="#00ff00") imgc.show() return def show_overlay(self): """Show the current value overlaid on the current image. Mostly for debugging.""" img_c = self.full.copy().convert("RGB") draw = ImageDraw.Draw(img_c) draw.rectangle(self.gauge_box, outline='#00FF00') dr = self.dial_radius bx, by = self.gauge_box[:2] for (factor,(dx,dy,rot)) in self.dial_centers.items(): if self.dial_angles is not None: angle = self.dial_angles[factor] else: angle = 0 draw.pieslice((dx-dr+bx, dy-dr+by, dx+dr+bx, dy+dr+by), angle, angle-1, outline='#00FF00') if self.dial_values is not None: digit = self.dial_values[factor] draw.text((dx+bx-10, dy+by-dr-15), "%0.2f" % digit, fill='#00FF00') # show only a portion around the bounding box crop_box = [-30, -30, 30, 30] for (i,val) in enumerate(self.gauge_box): crop_box[i] += val img_c.crop(crop_box) img_c.show() return def findangle_pie(self, img, arcsize = 10, verbose=False): """Fine the needle angle by fitting a small pie slice to it""" angles = range(0,360, max(arcsize/2,1)) mincnt = img.size[0]*img.size[1] minangle = None for angle in angles: imgc = img.copy() draw = ImageDraw.Draw(imgc) draw.pieslice( (0,0,img.size[0],img.size[1]), angle, angle+arcsize, fill=0) count = imgc.histogram()[255] if count < mincnt: mincnt = count minangle = angle if self.debug and verbose: print "Angle, count:", angle, count self.show_pie(img, angle, arcsize) return (minangle, arcsize) # This didn't work so well... def findangle_line(self, img): xs = [float(a) * 20 for a in range(-10,10)] ys = [float(a) * 20 for a in range(-10,10)] mincnt = img.size[0]*img.size[1] minoffset = (None,None) center = tuple([a / 2 for a in img.size]) for x_off in xs: for y_off in ys: imgc = img.copy() draw = ImageDraw.Draw(imgc) endpoint = (center[0] + x_off, center[1] + y_off) draw.line( (center, endpoint), fill=0) count = imgc.histogram()[255] print "min, count:", mincnt, count if count < mincnt: mincnt = count minoffset = (x_off, y_off) print "x_off, y_off:", x_off, y_off imgc.show() return if __name__ == '__main__': # hardcoded coordinates of an arbitrary bounding box to containing # the dials, dial centers, and the dial radius. gb = (30, 275, 330, 450) dial_centers = {1000000 : (59, 48, 'ccw'), 100000 : (126, 47, 'cw'), 10000 : (193, 47, 'ccw'), 1000 : (260, 46, 'cw'), 2 : (39, 137, 'ccw'), 0.5 : (112, 137, 'ccw')} dial_radius = 31 # Print debug information while parsing, and display images debug=False # Don't capture new images. Use temp.jpeg from the current directory debug_nocap=True # Store captured images in this path debug_logcappath=os.path.expandvars('$HOME/tmp/images/') # Initialize the capture class c_cap = capture(gb, dial_centers, dial_radius, debug=debug, debug_nocap=debug_nocap, debug_logcappath=debug_logcappath) t_last_cumm = None t_last = None value_last = None dial_last = None log_out = open("meter.out",'a', buffering=1) while True: (t, value) = c_cap.get_reading() t = int(t) dial_values = c_cap.dial_values prev_values = c_cap.prev_values if dial_last is None or value != value_last: print dial_values t_last = t dial_last = dial_values value_last = value print >> log_out,"%d %0.2f - %0.3f - %0.3f %d %d - %0.3f %d %d" % ( t, value, prev_values[1000], prev_values[2], c_cap.rot_count_adj[2], c_cap.rot_count[2], prev_values[0.5], c_cap.rot_count_adj[0.5], c_cap.rot_count[0.5]) if debug_nocap: break time.sleep(5)