python sqlite3 row_factory and text_factory

row_factory

主要功能是改變sqlite抓取到資料,回傳的output的格式。

例如原先的output type是tuple:

cursor.execute("select 1 as a")
result = cursor.fetchone()
print type(result) is tuple

如果要使用dict,可以覆蓋row_factory

import sqlite3

def dict_factory(cursor, row):
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d

con = sqlite3.connect(":memory:")
con.row_factory = dict_factory
cur = con.cursor()
cur.execute("select 1 as a")
print cur.fetchone()["a"]

其實也可以直接使用官方預設的sqlite3/336/)./336/)Row

con.row_factory = sqlite3.Row
cur = con.cursor()
cur.execute("select 1 as a")
result = cur.fetchone()
#1
print result["a"]
#1
print result[0]
#1
print len( result )
#["a"]
print result.keys()

text_factory

只要query出來的內容是text,就會經過這個function,可以用於字串編碼轉換,當然也可以利用這個方式去修改內容.。

以下是簡單example:

import sqlite3

con = sqlite3.connect(":memory:")
cur = con.cursor()

AUSTRIA = u"\xd6sterreich"

cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone()

#<type 'unicode'>
print type(row[0])

con.text_factory = str
cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone()

#<type 'str'>
print type(row[0])

con.text_factory = lambda x: unicode(x, "utf-8", "ignore")
cur.execute("select ?", ("this is latin1 and would normally create errors" +
                         u"\xe4\xf6\xfc".encode("latin1"),))
row = cur.fetchone()

#<type 'unicode'>
print type(row[0])

con.text_factory = sqlite3.OptimizedUnicode
cur.execute("select ?", (AUSTRIA,))
row = cur.fetchone()

#<type 'unicode'>
print type(row[0])

con.text_factory = sqlite3.OptimizedUnicode
cur.execute("select ?", ("test",))
row = cur.fetchone()

#<type 'str'>
print type(row[0])

其中sqlite3.OptimizedUnicode會自動偵測轉換不同編碼,如果字串是 ASCII可以顯示的,將會回傳str,否則回傳unucode。

 

題外話:

如果發現import sqlite3找不到module…,讓就必須在重新編譯python…,在編譯之前先執行:

sudo apt-get install libsqlite3-dev

在重新編譯python安裝即可。

python sqlite3 create function、aggregate and collation

create_function

如果在sqlite想自訂function,可使用create_function來處理,以下是範例:

import sqlite3
import md5       

def md5sum(t):
    return md5.md5(t).hexdigest()

con = sqlite3.connect(":memory:")
con.create_function("md5", 1, md5sum)
cur = con.cursor()
cur.execute("create table test(i)")
cur.execute("insert into test(i) values (1)")
cur.execute("insert into test(i) values (2)")
cur.execute("select md5(i) from test")
print cur.fetchall()

執行後會回傳以下結果:

[(u'c4ca4238a0b923820dcc509a6f75849b',), (u'c81e728d9d4c2f636f067f89cc14862c',)]

從上面結果可以看出,在每筆資料中都是獨立的,只將原先輸入的值或欄位值去處理。另外connect的部份,是直接使用記憶體,而不是file方式,所以程式結束後,就釋放掉了。

create_aggregate

那麼如果製作一個類似count、sum之類的功能,可以使用create_aggregate,以下是官方範例:

import sqlite3 

class MySum:   
    def __init__(self):
        self.count = 0

    def step(self, value):
        self.count += value 

    def finalize(self):
        return self.count

con = sqlite3.connect(":memory:")
con.create_aggregate("mysum", 1, MySum)
cur = con.cursor()
cur.execute("create table test(i)")
cur.execute("insert into test(i) values (1)")
cur.execute("insert into test(i) values (2)")
cur.execute("select mysum(i) from test")
print cur.fetchall()

執行後回傳結果:

[(3,)]

當執行select的時候,會呼叫mysum,mysum的step會將所有傳入值累加,在由finalize回傳結果,這樣就可以處理每筆資料之間的關係。

create_collation

最後create_collation處理的功能,就是排序問題,可以依照自己自訂方式排序,官方提供的範例是反向功能(reverse),如下:

import sqlite3 

def collate_reverse(string1, string2):                
    return -cmp(string1, string2)                     

con = sqlite3.connect(":memory:")                     
con.create_collation("reverse", collate_reverse)      

cur = con.cursor()                                    
cur.execute("create table test(x)")                   
cur.executemany("insert into test(x) values (?)", [("a",), ("b",)])
cur.execute("select x from test order by x collate reverse")
for row in cur:
    print row

con.close() 

執行結果如下:

(u'b',)
(u'a',)

collate_reverse(cmp)會將兩個值做比較,將會回傳-1、0、1這3種結果:

string1 < string2 回傳 -1 
string1 == string2 回傳 0 
string1 > string 2 回傳 1

所以官方範例,最後只要在cmp加個負號,所有排序都會相反。