main
Matthew Butterick 10 years ago
parent 2dd7edfb56
commit dac7f26dbb

@ -3,7 +3,7 @@
;; Adapted from work by Peter Norvig ;; Adapted from work by Peter Norvig
;; http://aima-python.googlecode.com/svn/trunk/csp.py ;; http://aima-python.googlecode.com/svn/trunk/csp.py
(require racket/list racket/bool racket/contract racket/class racket/match) (require racket/list racket/bool racket/contract racket/class racket/match racket/generator)
(require "utils.rkt" "search.rkt") (require "utils.rkt" "search.rkt")
(define CSP (class Problem (define CSP (class Problem
@ -128,415 +128,132 @@ This class describes finite-domain Constraint Satisfaction Problems.
;;______________________________________________________________________________ ;;______________________________________________________________________________
;; CSP Backtracking Search ;; CSP Backtracking Search
(define (backtracking_search csp [mcv #f] [lcv #f] [fc #f] [mac #f])
#|
Set up to do recursive backtracking search. Allow the following options:
(define (AC3 csp queue)
(void))
#|
(define (actions csp state)
;; Return a list of applicable actions: nonconflicting
;; assignments to an unassigned variable.
(if (= (length state) (length (hash-ref csp 'vars)))
null
(let ()
(define assignment (make-hash state))
(define var (findf (λ(v) (not (hash-has-key? assignment v))) (hash-ref csp 'vars)))
(map (λ(val) (list var val))
(filter (λ(val) (= 0 (nconflicts csp var val assignment))) (hash-ref (hash-ref csp 'domains) var))))))
|#
#|
def actions(self, state):
"""Return a list of applicable actions: nonconflicting
assignments to an unassigned variable."""
if len(state) == len(self.vars):
return []
else:
assignment = dict(state)
var = find_if(lambda v: v not in assignment, self.vars)
return [(var, val) for val in self.domains[var]
if self.nconflicts(var, val, assignment) == 0]
def result(self, state, (var, val)):
"Perform an action and return the new state."
return state + ((var, val),)
def goal_test(self, state):
"The goal is to assign all vars, with all constraints satisfied."
assignment = dict(state)
return (len(assignment) == len(self.vars) and
every(lambda var: self.nconflicts(var, assignment[var],
assignment) == 0,
self.vars))
## These are for constraint propagation
def support_pruning(self):
"""Make sure we can prune values from domains. (We want to pay
for this only if we use it.)"""
if self.curr_domains is None:
self.curr_domains = dict((v, list(self.domains[v]))
for v in self.vars)
def suppose(self, var, value):
"Start accumulating inferences from assuming var=value."
self.support_pruning()
removals = [(var, a) for a in self.curr_domains[var] if a != value]
self.curr_domains[var] = [value]
return removals
def prune(self, var, value, removals):
"Rule out var=value."
self.curr_domains[var].remove(value)
if removals is not None: removals.append((var, value))
def choices(self, var):
"Return all values for var that aren't currently ruled out."
return (self.curr_domains or self.domains)[var]
def infer_assignment(self):
"Return the partial assignment implied by the current inferences."
self.support_pruning()
return dict((v, self.curr_domains[v][0])
for v in self.vars if 1 == len(self.curr_domains[v]))
def restore(self, removals):
"Undo a supposition and all inferences from it."
for B, b in removals:
self.curr_domains[B].append(b)
## This is for min_conflicts search
def conflicted_vars(self, current):
"Return a list of variables in current assignment that are in conflict"
return [var for var in self.vars
if self.nconflicts(var, current[var], current) > 0]
#______________________________________________________________________________
# CSP Backtracking Search
def backtracking_search(csp, mcv=False, lcv=False, fc=False, mac=False):
"""Set up to do recursive backtracking search. Allow the following options:
mcv - If true, use Most Constrained Variable Heuristic mcv - If true, use Most Constrained Variable Heuristic
lcv - If true, use Least Constraining Value Heuristic lcv - If true, use Least Constraining Value Heuristic
fc - If true, use Forward Checking fc - If true, use Forward Checking
mac - If true, use Maintaining Arc Consistency. [Fig. 5.3] mac - If true, use Maintaining Arc Consistency. [Fig. 5.3]
>>> backtracking_search(australia) >>> backtracking_search(australia)
{'WA': 'B', 'Q': 'B', 'T': 'B', 'V': 'B', 'SA': 'G', 'NT': 'R', 'NSW': 'R'} {'WA': 'B', 'Q': 'B', 'T': 'B', 'V': 'B', 'SA': 'G', 'NT': 'R', 'NSW': 'R'}
""" |#
if fc or mac: (when (or fc mac)
csp.curr_domains, csp.pruned = {}, {} (set-field! curr_domains csp (hash))
for v in csp.vars: (set-field! pruned csp (hash)))
csp.curr_domains[v] = csp.domains[v][:] (set-field! mcv csp mcv)
csp.pruned[v] = [] (set-field! lcv csp lcv)
update(csp, mcv=mcv, lcv=lcv, fc=fc, mac=mac) (set-field! fc csp fc)
return recursive_backtracking({}, csp) (set-field! mac csp mac))
def recursive_backtracking(assignment, csp): (define (recursive_backtracking assignment csp)
"""Search for a consistent assignment for the csp. ;; Search for a consistent assignment for the csp.
Each recursive call chooses a variable, and considers values for it.""" ;; Each recursive call chooses a variable, and considers values for it.
if len(assignment) == len(csp.vars): (cond
return assignment [(= (length assignment) (length (get-field vars csp))) assignment]
var = select_unassigned_variable(assignment, csp) [else
for val in order_domain_values(var, assignment, csp): (define var (select_unassigned_variable assignment csp))
if csp.fc or csp.nconflicts(var, val, assignment) == 0: (define result null)
csp.assign(var, val, assignment) (let/ec done ;; sneaky way of getting return-like functionality
result = recursive_backtracking(assignment, csp) (for ([val (in-list (order_domain_values var assignment csp))])
if result is not None: (when (or (get-field fc csp) (= (send csp nconflicts var val assignment) 0))
return result (send csp assign var val assignment)
csp.unassign(var, assignment) (set! result (recursive_backtracking assignment csp))
return None (when (not (null? result))
(done))
def select_unassigned_variable(assignment, csp): (send csp unassign var assignment)))
"Select the variable to work on next. Find" result)]))
if csp.mcv: # Most Constrained Variable
unassigned = [v for v in csp.vars if v not in assignment]
return argmin_random_tie(unassigned, (define (select_unassigned_variable assignment csp)
lambda var: -num_legal_values(csp, var, assignment)) ;; Select the variable to work on next. Find
else: # First unassigned variable (if (get-field mcv csp) ; most constrained variable
for v in csp.vars: (let ()
if v not in assignment: (define unassigned (filter (λ(v) (not (hash-has-key? assignment v))) (get-field vars csp)))
return v (argmin_random_tie unassigned (λ(var) (* -1 (num_legal_values csp var assignment)))))
;; else first unassigned variable
def order_domain_values(var, assignment, csp): (for/first ([v (in-list (get-field vars csp))] #:when (not (hash-has-key? assignment v)))
"Decide what order to consider the domain variables." v)))
if csp.curr_domains:
domain = csp.curr_domains[var] (define (order_domain_values var assignment csp)
else: ;; Decide what order to consider the domain variables.
domain = csp.domains[var][:] (define domain (if (get-field curr_domains csp)
if csp.lcv: (hash-ref (get-field curr_domains csp) var)
# If LCV is specified, consider values with fewer conflicts first (hash-ref (get-field domains csp) var)))
key = lambda val: csp.nconflicts(var, val, assignment) (when (get-field lcv csp)
domain.sort(lambda(x,y): cmp(key(x), key(y))) ;; If LCV is specified, consider values with fewer conflicts first
while domain: (define key (λ(val) (send csp nconflicts var val assignment)))
yield domain.pop() (set! domain (sort domain < #:key key)))
(generator ()
def num_legal_values(csp, var, assignment): (let loop ([niamod (reverse domain)])
if csp.curr_domains: (yield (car niamod))
return len(csp.curr_domains[var]) (loop (cdr niamod)))))
else:
return count_if(lambda val: csp.nconflicts(var, val, assignment) == 0, (define (num_legal_values csp var assignment)
csp.domains[var]) (if (get-field curr_domains csp)
(length (hash-ref (get-field curr_domains csp) var))
#______________________________________________________________________________ (count_if (λ(val) (= (send csp nconflicts var val assignment) 0)) (hash-ref (get-field domains csp) var))))
# Constraint Propagation with AC-3
def AC3(csp, queue=None):
"""[Fig. 5.7]"""
if queue == None:
queue = [(Xi, Xk) for Xi in csp.vars for Xk in csp.neighbors[Xi]]
while queue:
(Xi, Xj) = queue.pop()
if remove_inconsistent_values(csp, Xi, Xj):
for Xk in csp.neighbors[Xi]:
queue.append((Xk, Xi))
def remove_inconsistent_values(csp, Xi, Xj):
"Return true if we remove a value."
removed = False
for x in csp.curr_domains[Xi][:]:
# If Xi=x conflicts with Xj=y for every possible y, eliminate Xi=x
if every(lambda y: not csp.constraints(Xi, x, Xj, y),
csp.curr_domains[Xj]):
csp.curr_domains[Xi].remove(x)
removed = True
return removed
#______________________________________________________________________________
# Min-conflicts hillclimbing search for CSPs
def min_conflicts(csp, max_steps=1000000):
"""Solve a CSP by stochastic hillclimbing on the number of conflicts."""
# Generate a complete assignement for all vars (probably with conflicts)
current = {}; csp.current = current
for var in csp.vars:
val = min_conflicts_value(csp, var, current)
csp.assign(var, val, current)
# Now repeapedly choose a random conflicted variable and change it
for i in range(max_steps):
conflicted = csp.conflicted_vars(current)
if not conflicted:
return current
var = random.choice(conflicted)
val = min_conflicts_value(csp, var, current)
csp.assign(var, val, current)
return None
def min_conflicts_value(csp, var, current):
"""Return the value that will give var the least number of conflicts.
If there is a tie, choose at random."""
return argmin_random_tie(csp.domains[var],
lambda val: csp.nconflicts(var, val, current))
#______________________________________________________________________________
# Map-Coloring Problems
class UniversalDict:
"""A universal dict maps any key to the same value. We use it here
as the domains dict for CSPs in which all vars have the same domain.
>>> d = UniversalDict(42)
>>> d['life']
42
"""
def __init__(self, value): self.value = value
def __getitem__(self, key): return self.value
def __repr__(self): return '{Any: %r}' % self.value
def different_values_constraint(A, a, B, b):
"A constraint saying two neighboring variables must differ in value."
return a != b
def MapColoringCSP(colors, neighbors):
"""Make a CSP for the problem of coloring a map with different colors
for any two adjacent regions. Arguments are a list of colors, and a
dict of {region: [neighbor,...]} entries. This dict may also be
specified as a string of the form defined by parse_neighbors"""
if isinstance(neighbors, str):
neighbors = parse_neighbors(neighbors)
return CSP(neighbors.keys(), UniversalDict(colors), neighbors,
different_values_constraint)
def parse_neighbors(neighbors, vars=[]):
"""Convert a string of the form 'X: Y Z; Y: Z' into a dict mapping
regions to neighbors. The syntax is a region name followed by a ':'
followed by zero or more region names, followed by ';', repeated for
each region name. If you say 'X: Y' you don't need 'Y: X'.
>>> parse_neighbors('X: Y Z; Y: Z')
{'Y': ['X', 'Z'], 'X': ['Y', 'Z'], 'Z': ['X', 'Y']}
"""
dict = DefaultDict([])
for var in vars:
dict[var] = []
specs = [spec.split(':') for spec in neighbors.split(';')]
for (A, Aneighbors) in specs:
A = A.strip();
dict.setdefault(A, [])
for B in Aneighbors.split():
dict[A].append(B)
dict[B].append(A)
return dict
australia = MapColoringCSP(list('RGB'),
'SA: WA NT Q NSW V; NT: WA Q; NSW: Q V; T: ')
usa = MapColoringCSP(list('RGBY'),
"""WA: OR ID; OR: ID NV CA; CA: NV AZ; NV: ID UT AZ; ID: MT WY UT;
UT: WY CO AZ; MT: ND SD WY; WY: SD NE CO; CO: NE KA OK NM; NM: OK TX;
ND: MN SD; SD: MN IA NE; NE: IA MO KA; KA: MO OK; OK: MO AR TX;
TX: AR LA; MN: WI IA; IA: WI IL MO; MO: IL KY TN AR; AR: MS TN LA;
LA: MS; WI: MI IL; IL: IN; IN: KY; MS: TN AL; AL: TN GA FL; MI: OH;
OH: PA WV KY; KY: WV VA TN; TN: VA NC GA; GA: NC SC FL;
PA: NY NJ DE MD WV; WV: MD VA; VA: MD DC NC; NC: SC; NY: VT MA CA NJ;
NJ: DE; DE: MD; MD: DC; VT: NH MA; MA: NH RI CT; CT: RI; ME: NH;
HI: ; AK: """)
#______________________________________________________________________________
# n-Queens Problem
def queen_constraint(A, a, B, b):
"""Constraint is satisfied (true) if A, B are really the same variable,
or if they are not in the same row, down diagonal, or up diagonal."""
return A == B or (a != b and A + a != B + b and A - a != B - b)
class NQueensCSP(CSP):
"""Make a CSP for the nQueens problem for search with min_conflicts.
Suitable for large n, it uses only data structures of size O(n).
Think of placing queens one per column, from left to right.
That means position (x, y) represents (var, val) in the CSP.
The main structures are three arrays to count queens that could conflict:
rows[i] Number of queens in the ith row (i.e val == i)
downs[i] Number of queens in the \ diagonal
such that their (x, y) coordinates sum to i
ups[i] Number of queens in the / diagonal
such that their (x, y) coordinates have x-y+n-1 = i
We increment/decrement these counts each time a queen is placed/moved from
a row/diagonal. So moving is O(1), as is nconflicts. But choosing
a variable, and a best value for the variable, are each O(n).
If you want, you can keep track of conflicted vars, then variable
selection will also be O(1).
>>> len(backtracking_search(NQueensCSP(8)))
8
>>> len(min_conflicts(NQueensCSP(8)))
8
"""
def __init__(self, n):
"""Initialize data structures for n Queens."""
CSP.__init__(self, range(n), UniversalDict(range(n)),
UniversalDict(range(n)), queen_constraint)
update(self, rows=[0]*n, ups=[0]*(2*n - 1), downs=[0]*(2*n - 1))
def nconflicts(self, var, val, assignment):
"""The number of conflicts, as recorded with each assignment.
Count conflicts in row and in up, down diagonals. If there
is a queen there, it can't conflict with itself, so subtract 3."""
n = len(self.vars)
c = self.rows[val] + self.downs[var+val] + self.ups[var-val+n-1]
if assignment.get(var, None) == val:
c -= 3
return c
def assign(self, var, val, assignment):
"Assign var, and keep track of conflicts."
oldval = assignment.get(var, None)
if val != oldval:
if oldval is not None: # Remove old val if there was one
self.record_conflict(assignment, var, oldval, -1)
self.record_conflict(assignment, var, val, +1)
CSP.assign(self, var, val, assignment)
def unassign(self, var, assignment):
"Remove var from assignment (if it is there) and track conflicts."
if var in assignment:
self.record_conflict(assignment, var, assignment[var], -1)
CSP.unassign(self, var, assignment)
def record_conflict(self, assignment, var, val, delta):
"Record conflicts caused by addition or deletion of a Queen."
n = len(self.vars)
self.rows[val] += delta
self.downs[var + val] += delta
self.ups[var - val + n - 1] += delta
def display(self, assignment):
"Print the queens and the nconflicts values (for debugging)."
n = len(self.vars)
for val in range(n):
for var in range(n):
if assignment.get(var,'') == val: ch ='Q'
elif (var+val) % 2 == 0: ch = '.'
else: ch = '-'
print ch,
print ' ',
for var in range(n):
if assignment.get(var,'') == val: ch ='*'
else: ch = ' '
print str(self.nconflicts(var, val, assignment))+ch,
print
#______________________________________________________________________________
# The Zebra Puzzle
def Zebra(): ;;______________________________________________________________________________
"Return an instance of the Zebra Puzzle." ;; Constraint Propagation with AC-3
Colors = 'Red Yellow Blue Green Ivory'.split()
Pets = 'Dog Fox Snails Horse Zebra'.split()
Drinks = 'OJ Tea Coffee Milk Water'.split() (define (AC3 csp [queue null])
Countries = 'Englishman Spaniard Norwegian Ukranian Japanese'.split() (when (null? queue)
Smokes = 'Kools Chesterfields Winston LuckyStrike Parliaments'.split() (set! queue (for*/list ([Xi (in-list (get-field vars csp))]
vars = Colors + Pets + Drinks + Countries + Smokes [Xk (in-list (hash-ref (get-field neighbors csp) Xi))])
domains = {} (cons Xi Xk))))
for var in vars: (let loop ([eueuq (reverse queue)])
domains[var] = range(1, 6) (when (not (null? eueuq))
domains['Norwegian'] = [1] (match-define (cons Xi Xj) (car eueuq))
domains['Milk'] = [3] (set! eueuq (cdr eueuq)) ;; equivalent to python pop
neighbors = parse_neighbors("""Englishman: Red; (when (remove_inconsistent_values csp Xi Xj)
Spaniard: Dog; Kools: Yellow; Chesterfields: Fox; (set! eueuq
Norwegian: Blue; Winston: Snails; LuckyStrike: OJ; (append
Ukranian: Tea; Japanese: Parliaments; Kools: Horse; (reverse (for/list ([Xk (in-list (hash-ref (get-field neighbors csp) Xi))])
Coffee: Green; Green: Ivory""", vars) (cons Xk Xi)))
for type in [Colors, Pets, Drinks, Countries, Smokes]: eueuq)))
for A in type: (loop eueuq))))
for B in type:
if A != B: (define (remove_inconsistent_values csp Xi Xj)
if B not in neighbors[A]: neighbors[A].append(B) ;; Return true if we remove a value.
if A not in neighbors[B]: neighbors[B].append(A) (define removed #f)
def zebra_constraint(A, a, B, b, recurse=0): (for ([x (in-list (hash-ref (get-field curr_domains csp) Xi))])
same = (a == b) ;; If Xi=x conflicts with Xj=y for every possible y, eliminate Xi=x
next_to = abs(a - b) == 1 (when (every (λ(y) (not (send csp constraints Xi x Xj y)))
if A == 'Englishman' and B == 'Red': return same (hash-ref (get-field curr_domains csp) Xj))
if A == 'Spaniard' and B == 'Dog': return same (hash-update! (get-field curr_domains csp) Xi (λ(val) (remove val x)))
if A == 'Chesterfields' and B == 'Fox': return next_to (set! removed #t)))
if A == 'Norwegian' and B == 'Blue': return next_to removed)
if A == 'Kools' and B == 'Yellow': return same
if A == 'Winston' and B == 'Snails': return same
if A == 'LuckyStrike' and B == 'OJ': return same
if A == 'Ukranian' and B == 'Tea': return same
if A == 'Japanese' and B == 'Parliaments': return same
if A == 'Kools' and B == 'Horse': return next_to
if A == 'Coffee' and B == 'Green': return same
if A == 'Green' and B == 'Ivory': return (a - 1) == b
if recurse == 0: return zebra_constraint(B, b, A, a, 1)
if ((A in Colors and B in Colors) or
(A in Pets and B in Pets) or
(A in Drinks and B in Drinks) or
(A in Countries and B in Countries) or
(A in Smokes and B in Smokes)): return not same
raise 'error'
return CSP(vars, domains, neighbors, zebra_constraint)
def solve_zebra(algorithm=min_conflicts, **args): ;;______________________________________________________________________________
z = Zebra() ;; Min-conflicts hillclimbing search for CSPs
ans = algorithm(z, **args)
for h in range(1, 6): (define (min_conflicts csp [max_steps 1000000])
print 'House', h, ;; Solve a CSP by stochastic hillclimbing on the number of conflicts.
for (var, val) in ans.items(): ;; Generate a complete assignment for all vars (probably with conflicts)
if val == h: print var, (define current (hash))
print (set-field! current csp current)
return ans['Zebra'], ans['Water'], z.nassigns, ans, (for ([var (in-list (get-field vars csp))])
(define val (min_conflicts_value csp var current))
(send csp assign var val current))
|# ;; Now repeatedly choose a random conflicted variable and change it
(define found-result #f)
(let/ec done ;; sneaky way of getting return-like functionality
(for ([i (in-range max_steps)])
(define conflicted (send csp conflicted_vars current))
(when (not conflicted) (set! found-result #t) (done))
(define var (list-ref conflicted (random (length conflicted))))
(define val (min_conflicts_value csp var current))
(send csp assign var val current)))
(and found-result current))
(define (min_conflicts_value csp var current)
;; Return the value that will give var the least number of conflicts.
;; If there is a tie, choose at random.
(argmin_random_tie (hash-ref (get-field domains csp) var)
(λ(val) (send csp nconflicts var val current))))

@ -21,12 +21,23 @@
(module+ test (module+ test
(check-equal? (find_if procedure? (list 3 min max)) min) (check-equal? (find_if procedure? (list 3 min max)) min)
(check-equal? (find_if procedure? (list 1 2 3)) null)) (check-equal? (find_if procedure? (list 1 2 3)) null))
(define (every pred xs) (define (every pred xs)
;;;True if every element of seq satisfies predicate. ;;;True if every element of seq satisfies predicate.
(andmap pred xs)) (andmap pred xs))
(module+ test (module+ test
(check-true (every procedure? (list min max))) (check-true (every procedure? (list min max)))
(check-false (every procedure? (list min 3)))) (check-false (every procedure? (list min 3))))
(define (argmin_random_tie xs proc)
;; Return an element with lowest fn(seq[i]) score; break ties at random.
;; Thus, for all s,f: argmin_random_tie(s, f) in argmin_list(s, f)
(define assocs (map (λ(x) (cons (proc x) x)) xs))
(define min-value (apply min (map car assocs)))
(define min-xs (map cdr (filter (λ(a) (= min-value (car a))) assocs)))
(list-ref min-xs (random (length min-xs))))
;(argmin_random_tie (list (range 0 4) (range 5 9) (range 10 13) (range 20 23)) length)
Loading…
Cancel
Save