tvl-depot/users/wpcarro/scratch/simple-select/main.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

230 lines
5.9 KiB
Python
Raw Normal View History

import sqlite3
import string
from scanner import Scanner
from parser import Parser
################################################################################
# Predicates
################################################################################
def is_alpha(c):
return c in string.ascii_letters
def is_digit(c):
return c in "0123456789"
def is_alphanumeric(c):
return is_alpha(c) or is_digit(c)
def is_whitespace(c):
return c in " \r\t\n"
################################################################################
# Tokenizer
################################################################################
AND = ("CONJUNCTION", "AND")
OR = ("CONJUNCTION", "OR")
NOT = ("PUNCTUATION", "NOT")
COLON = ("PUNCTUATION", "COLON")
LPAREN = ("PUNCTUATION", "LPAREN")
RPAREN = ("PUNCTUATION", "RPAREN")
def tokenize(x):
s = Scanner(x)
tokens = scan_tokens(s)
return tokens
def scan_tokens(s):
result = []
while not s.exhausted():
if is_whitespace(s.peek()):
s.advance()
else:
result.append(scan_token(s))
return result
def scan_token(s):
punctuation = {
"-": NOT,
":": COLON,
"(": LPAREN,
")": RPAREN,
}
c = s.peek()
if c in punctuation:
s.advance()
return punctuation[c]
if c == "\"":
return tokenize_string(s)
if c == "/":
return tokenize_regex(s)
if is_alpha(c):
return tokenize_identifier(s)
def tokenize_string(s):
s.advance() # ignore opening 2x-quote
current = ""
while s.peek() != "\"" and not s.exhausted():
current += s.advance()
if s.exhausted():
raise Exception("Unterminated string")
s.advance() # ignore closing 2x-quote
return ("STRING", current)
def tokenize_regex(s):
s.advance() # ignore opening forward-slash
current = ""
while s.peek() != "/" and not s.exhausted():
current += s.advance()
if s.exhausted():
raise Exception("Unterminated regex")
s.advance() # ignore closing forward-slash
return ("REGEX", current)
def tokenize_identifier(s):
conjunctions = {
"AND",
"OR",
}
current = s.advance()
while is_alphanumeric(s.peek()):
current += s.advance()
if current.upper() in conjunctions:
return ("CONJUNCTION", current.upper())
else:
return ("IDENTIFIER", current)
################################################################################
# Parser
################################################################################
# EBNF
# Note: we order expression types by ascending levels of precedence.
#
# expression -> conjunction ;
# conjunction -> selection ( ( "AND" | "OR" )? selection )* ;
# selection -> "-"? IDENTIFIER ":" ( REGEX | STRING ) | grouping ;
# grouping -> REGEX | STRING | "(" expression ")" ;
def parse(x):
tokens = tokenize(x)
p = Parser(tokens)
return expression(p)
def expression(p):
return conjunction(p)
def conjunction(p):
lhs = selection(p)
# TODO(wpcarro): Support default AND conjuctions when they're undefined.
while not p.exhausted() and p.match({AND, OR}):
conj = p.peek(n=-1)
rhs = selection(p)
lhs = ("CONJUNCTION", conj[1], lhs, rhs)
return lhs
def selection(p):
negate = False
if p.peek() == NOT:
negate = True
p.advance()
if p.peek()[0] != "IDENTIFIER":
return grouping(p)
ident = p.expect(lambda x: x[0] == "IDENTIFIER")
colon = p.expect(lambda x: x[1] == "COLON")
value = p.expect(lambda x: x[0] in {"REGEX", "STRING"})
return ("SELECTION", negate, ident[1], value)
def grouping(p):
if p.peek()[0] == "REGEX":
return p.advance()
if p.peek()[0] == "STRING":
return p.advance()
if p.peek() == LPAREN:
p.advance()
expr = expression(p)
p.expect(lambda x: x == RPAREN)
return ("GROUPING", expr)
################################################################################
# Compiler
################################################################################
def compile(source, table, columns):
ast = parse(source)
return "SELECT * FROM {} WHERE {};".format(table, do_compile(ast, columns))
def do_compile(ast, columns):
if ast[0] == "REGEX":
cols = "({})".format(" || ".join(columns))
return "{} REGEXP '.*{}.*'".format(cols, ast[1])
if ast[0] == "STRING":
cols = "({})".format(" || ".join(columns))
return "{} LIKE '%{}%'".format(cols, ast[1])
if ast[0] == "SELECTION":
return compile_selection(ast)
if ast[0] == "CONJUNCTION":
_, conj, lhs, rhs = ast
lhs = do_compile(lhs, columns)
rhs = do_compile(rhs, columns)
return "{} {} {}".format(lhs, conj, rhs)
if ast[0] == "GROUPING":
return "({})".format(do_compile(ast[1], columns))
raise Exception("Unexpected AST: \"{}\"".format(ast))
def compile_selection(ast):
_, negate, column, query = ast
match = compile_query(negate, query)
return "{} {}".format(column, match)
def compile_query(negate, query):
query_type, query_string = query
if query_type == "REGEX":
if negate:
return "NOT REGEXP '.*{}.*'".format(query_string)
return "REGEXP '.*{}.*'".format(query_string)
if query_type == "STRING":
if negate:
return "NOT LIKE '%{}%'".format(query_string)
return "LIKE '%{}%'".format(query_string)
################################################################################
# Main
################################################################################
debug = False
def main():
# TODO(wpcarro): Read path from command-line.
con = sqlite3.connect('/depot/users/wpcarro/todo-lists/imdb/db.sqlite3')
cur = con.cursor()
# TODO(wpcarro): Read columns from CSV.
columns = ["year", "rating", "haveWatched", "director", "isCartoon", "requiresSubtitles"]
while True:
x = input("> ")
if debug:
print("tokens:\t{}".format(tokenize(x)))
print("AST:\t{}".format(parse(x)))
print("query:\t\"{}\"".format(compile(x, "Movies", columns)))
for row in cur.execute(compile(x, "Movies", columns)):
print("\t".join(str(cell) for cell in row))
if __name__ == "__main__":
main()