熱線電話:13121318867

登錄
首頁精彩閱讀使用Python進行穩定可靠的文件操作詳解
使用Python進行穩定可靠的文件操作詳解
2017-11-26
收藏

使用Python進行穩定可靠的文件操作詳解

考慮下述Python代碼片段。對文件中的數據進行某些操作,然后將結果保存回文件中:

代碼如下:
with open(filename) as f:
   input = f.read()
output = do_something(input)
with open(filename, 'w') as f:
   f.write(output)

看起來很簡單吧?可能看起來并不像乍一看這么簡單。我在產品服務器中調試應用,經常會出現奇怪的行為。
這是我看過的失效模式的例子:
失控的服務器進程溢出大量日志,磁盤被填滿。write()在截斷文件之后拋出異常,文件將會變成空的。
應用的幾個實例并行執行。在各個實例結束之后,因為混合了多個實例的輸出,文件內容最終變成了天書。
在完成了寫操作之后,應用會觸發一些后續操作。幾秒鐘后斷電。在我們重啟了服務器之后,我們再一次看到了舊的文件內容。已經傳遞給其它應用的數據與我們在文件中看到的不再一致。
下面沒有什么新的內容。本文的目的是為在系統編程方面缺少經驗的Python開發者提供常見的方法和技術。我將會提供代碼例子,使得開發者可以很容易的將這些方法應用到自己的代碼中。
“可靠性”意味著什么?
廣義的講,可靠性意味著在所有規定的條件下操作都能執行它所需的函數。至于文件的操作,這個函數就是創建,替換或者追加文件的內容的問題。這里可以從數據庫理論上獲得靈感。經典的事務模型的ACID性質作為指導來提高可靠性。
開始之前,讓我們先看看我們的例子怎樣和ACID4個性質扯上關系:
原子性(Atomicity)要求這個事務要么完全成功,要么完全失敗。在上面的實例中,磁盤滿了可能導致部分內容寫入文件。另外,如果正當在寫入內容時其它程序又在讀取文件,它們可能獲得是部分完成的版本,甚至會導致寫錯誤
一致性(Consistency) 表示操作必須從系統的一個狀態到另一個狀態。一致性可以分為兩部分:內部和外部一致性。內部一致性是指文件的數據結構是一致的。外部一致性是指文件的內容與它相關的數據是相符合的。在這個例子中,因為我們不了解這個應用,所以很難推斷是否符合一致性。但是因為一致性需要原子性,我們至少可以說沒有保證內部一致性。
隔離性(Isolation)如果在并發的執行事務中,多個相同的事務導致了不同的結果,就違反了隔離性。很明顯上面的代碼對操作失敗或者其它隔離性失敗都沒有保護。
持久性(Durability)意味著改變是持久不變的。在我們告訴用戶成功之前,我們必須確保我們的數據存儲是可靠的并且不只是一個寫緩存。上面的代碼已經成功寫入數據的前提是假設我們調用write()函數,磁盤I/O就立即執行。但是POSIX標準是不保證這個假設的。
盡可能使用數據庫系統
如果我們能夠獲得ACID 四個性質,那么我們增加可靠性方面取得了長遠發展。但是這需要很大的編碼功勞。為什么重復發明輪子?大多數數據庫系統已經有ACID事務了。
可靠性數據存儲已經是一個已解決的問題。如果你需要可靠性存儲,請使用數據庫。很可能,沒有幾十年的功夫,你自己解決這方面的能力沒有那些已經專注這方面好些年的人好。如果你不想安裝一個大數據庫服務器,那么你可以使用sqlite,它具有ACID事務,很小,免費的,而且它包含在Python的標準庫中。
文章本該在這里就結束的,但是還有一些有根有據的原因,就是不使用數據。它們通常是文件格式或者文件位置約束。這兩個在數據庫系統中都不好控制。理由如下:
我們必須處理其它應用產生的固定格式或者在固定位置的文件,
我們必須為了其它應用的消耗而寫文件(和應用了同樣的限制條件)
我們的文件必須方便人閱讀或者修改。
如果我們自己動手實現可靠的文件更新,那么這里有一些編程技術供參考。下面我將展示四種常見的操作文件更新模式。在那之后,我會討論采取哪些步驟在每個文件更新模式下滿足ACID性質。
文件更新模式
文件可以以多種方式更新,但是我認為至少有四種常見的模式。這四種模式將做為本文剩余部分的基礎。
截斷-寫
這可能是最基本的模式。在下述例子中,假設的域模型代碼讀數據,執行一些計算,然后以寫模式重新打開存在的文件:

代碼如下:
with open(filename, 'r') as f:
   model.read(f)
model.process()
with open(filename, 'w') as f:
   model.write(f)

此模式的一個變種以讀寫模式打開文件(Python中的“加”模式),尋找到開始的位置,顯式調用truncate(),重寫文件內容。

代碼如下:
with open(filename, 'a+') as f:
   f.seek(0)
   model.input(f.read())
   model.compute()
   f.seek(0)
   f.truncate()
   f.write(model.output())

該變種的優勢是只打開文件一次,始終保持文件打開。舉例來說,這樣可以簡化加鎖。
寫-替換
另外一種廣泛使用的模式是將新內容寫到臨時文件,之后替換原始文件:

代碼如下:
with tempfile.NamedTemporaryFile(
      'w', dir=os.path.dirname(filename), delete=False) as tf:
   tf.write(model.output())
   tempname = tf.name
os.rename(tempname, filename)

該方法與截斷-寫方法相比對錯誤更具有魯棒性。請看下面對原子性和一致性的討論。很多應用使用該方法。
這兩個模式很常見,以至于linux內核中的ext4文件系統甚至可以自動檢測到這些模式,自動修復一些可靠性缺陷。但是不要依賴這一特性:你并不是總是使用ext4,而且管理員可能會關掉這一特性。
追加

第三種模式就是追加新數據到已存在的文件:

代碼如下:
with open(filename, 'a') as f:
   f.write(model.output())

這個模式用來寫日志文件和其它累積處理數據的任務。從技術上講,它的顯著特點就是極其簡單。一個有趣的擴展應用就是常規操作中只通過追加操作更新,然后定期重新整理文件,使之更緊湊。
Spooldir
這里我們將目錄做為邏輯數據存儲,為每條記錄創建新的唯一命名的文件:

代碼如下:
with open(unique_filename(), 'w') as f:
   f.write(model.output())

該模式與附加模式一樣具有累積的特點。一個巨大的優勢是我們可以在文件名中放入少量元數據。舉例來說,這可以用于傳達處理狀態的信息。spooldir模式的一個特別巧妙的實現是maildir格式。maildirs使用附加子目錄的命名方案,以可靠的、無鎖的方式執行更新操作。md和gocept.filestore庫為maildir操作提供了方便的封裝。
如果你的文件名生成不能保證唯一的結果,甚至有可能要求文件必須實際上是新的。那么調用具有合適標志的低等級os.open():

代碼如下:
fd = os.open(filename, os.O_WRONLY | os.O_CREAT| os.O_EXCL, 0o666)
with os.fdopen(fd, 'w') as f:
   f.write(...)

在以O_EXCL方式打開文件后,我們用os.fdopen將原始的文件描述符轉化為普通的Python文件對象。
應用ACID屬性到文件更新
下面,我將嘗試加強文件更新模式。反過來讓我們看看可以做些什么來滿足ACID屬性。我將會盡可能保持簡單,因為我們并不是要寫一個完整的數據庫系統。請注意本節的材料并不徹底,但是可以為你自己的實驗提供一個好的起點。
原子性
寫-替換模式提供了原子性,因為底層的os.rename()是原子性的。這意味著在任意給定時間點,進程或者看到舊的文件,或者看到新的文件。該模式對寫錯誤具有天然的魯棒性:如果寫操作觸發異常,重命名操作就不會被執行,所有就沒有用損壞的新文件覆蓋正確的舊文件的風險。
附加模式并不是原子性的,因為有附加不完整記錄的風險。但是有個技巧可以使更新具有原子性:為每個寫操作標注校驗和。之后讀日志的時候,忽略所有沒有有效校驗和的記錄。以這種方式,只有完整的記錄才會被處理。在下面的例子中,應用做周期性的測量,每次在日志中附加一行JSON記錄。我們計算記錄的字節表示形式的CRC32校驗和,然后附加到同一行:

代碼如下:
with open(logfile, 'ab') as f:
    for i in range(3):
        measure = {'timestamp': time.time(), 'value': random.random()}
        record = json.dumps(measure).encode()
        checksum = '{:8x}'.format(zlib.crc32(record)).encode()
        f.write(record + b' ' + checksum + b'\n')

該例子代碼通過每次創建隨機值模擬測量。

代碼如下:
$ cat log
{"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a
{"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22
{"timestamp": 1373396987.258291, "value": 0.232021160265939} d229d937

想要處理這個日志文件,我們每次讀一行記錄,分離校驗和,與讀到的記錄比較。

代碼如下:
with open(logfile, 'rb') as f:
    for line in f:
        record, checksum = line.strip().rsplit(b' ', 1)
        if checksum.decode() == '{:8x}'.format(zlib.crc32(record)):
            print('read measure: {}'.format(json.loads(record.decode())))
        else:
            print('checksum error for record {}'.format(record))

現在我們通過截斷最后一行模擬被截斷的寫操作:

代碼如下:
$ cat log
{"timestamp": 1373396987.258189, "value": 0.9360123151217828} 9495b87a
{"timestamp": 1373396987.25825, "value": 0.40429005476999424} 149afc22
{"timestamp": 1373396987.258291, "value": 0.23202

當讀日志的時候,最后不完整的一行被拒絕:

代碼如下:
$ read_checksummed_log.py log
read measure: {'timestamp': 1373396987.258189, 'value': 0.9360123151217828}
read measure: {'timestamp': 1373396987.25825, 'value': 0.40429005476999424}
checksum error for record b'{"timestamp": 1373396987.258291, "value":'

添加校驗和到日志記錄的方法被用于大量應用,包括很多數據庫系統。
spooldir中的單個文件也可以在每個文件中添加校驗和。另外一個可能更簡單的方法是借用寫-替換模式:首先將文件寫到一邊,然后移到最終的位置。設計一個保護正在被消費者處理的文件的命名方案。在下面的例子中,所有以.tmp結尾的文件都會被讀取程序忽略,因此在寫操作的時候可以安全的使用。

代碼如下:
newfile = generate_id()
with open(newfile + '.tmp', 'w') as f:
   f.write(model.output())
os.rename(newfile + '.tmp', newfile)

最后,截斷-寫是非原子性的。很遺憾我不能提供滿足原子性的變種。在執行完截取操作后,文件是空的,還沒有新內容寫入。如果并發的程序現在讀文件或者有異常發生,程序中止,我們既看不久的版本也看不到新的版本。
一致性
我談論的關于原子性的大部分內容也可以應用到一致性。實際上,原子性更新是內部一致性的前提條件。外部一致性意味著同步更新幾個文件。這不容易做到,鎖文件可以用來確保讀寫訪問互不干涉??紤]某目錄下的文件需要互相保持一致。常用的模式是指定鎖文件,用來控制對整個目錄的訪問。
寫程序的例子:

代碼如下:
with open(os.path.join(dirname, '.lock'), 'a+') as lockfile:
   fcntl.flock(lockfile, fcntl.LOCK_EX)
   model.update(dirname)

讀程序的例子:

代碼如下:
with open(os.path.join(dirname, '.lock'), 'a+') as lockfile:
   fcntl.flock(lockfile, fcntl.LOCK_SH)
   model.readall(dirname)

該方法只有控制所有讀程序才生效。因為每次只有一個寫程序活動(獨占鎖阻塞所有共享鎖),所有該方法的可擴展性有限。
更進一步,我們可以對整個目錄應用寫-替換模式。這涉及為每次更新創建新的目錄,更新完成后改變符合鏈接。舉例來說,鏡像應用維護一個包含壓縮包和列出了文件名、文件大小和校驗和的索引文件的目錄。當上流的鏡像更新,僅僅隔離地對壓縮包和索引文件進項原子性更新是不夠的。相反,我們需要同時提供壓縮包和索引文件以免校驗和不匹配。為了解決這個問題,我們為每次生成維護一個子目錄,然后改變符號鏈接激活該次生成。

代碼如下:
mirror
|-- 483
|   |-- a.tgz
|   |-- b.tgz
|   `-- index.json
|-- 484
|   |-- a.tgz
|   |-- b.tgz
|   |-- c.tgz
|   `-- index.json
`-- current -> 483

新的生成484正在被更新的過程中。當所有壓縮包準備好,索引文件更新后,我們可以用一次原子調用os.symlink()來切換current符號鏈接。其它應用總是或者看到完全舊的或者完全新的生成。讀程序需要使用os.chdir()進入current目錄,很重要的是不要用完整路徑名指定文件。否在當讀程序打開current/index.json,然后打開current/a.tgz,但是同時符號鏈接已經改變時就會出現競爭條件。
隔離性
隔離性意味著對同一文件的并發更新是可串行化的——存在一個串行調度使得實際執行的并行調度返回相同的結果?!罢鎸嵉摹睌祿煜到y使用像MVCC這種高級技術維護可串行性,同時允許高等級的可并行性?;氐轿覀兊膱鼍?,我們最后使用加鎖來串行文件更新。
對截斷-寫更新進行加鎖是容易的。僅僅在所有文件操作前獲取一個獨占鎖就可以。下面的例子代碼從文件中讀取一個整數,然后遞增,最后更新文件:

代碼如下:
def update():
   with open(filename, 'r+') as f:
      fcntl.flock(f, fcntl.LOCK_EX)
      n = int(f.read())
      n += 1
      f.seek(0)
      f.truncate()
      f.write('{}\n'.format(n))

使用寫-替換模式加鎖更新就有點兒麻煩啦。像 截斷-寫那樣使用鎖可能導致更新沖突。某個幼稚的實現可能看起來像這樣

代碼如下:
def update():
   with open(filename) as f:
      fcntl.flock(f, fcntl.LOCK_EX)
      n = int(f.read())
      n += 1
      with tempfile.NamedTemporaryFile(
            'w', dir=os.path.dirname(filename), delete=False) as tf:
         tf.write('{}\n'.format(n))
         tempname = tf.name
      os.rename(tempname, filename)

這段代碼有什么問題呢?設想兩個進程競爭更新某個文件。第一個進程運行在前面,但是第二個進程阻塞在fcntl.flock()調用。當第一個進程替換了文件,釋放了鎖,現在在第二個進程中打開的文件描述符指向了一個包含舊內容的“幽靈”文件(任意路徑名都不可達)。想要避免這個沖突,我們必須檢查打開的文件是否與fcntl.flock()返回的相同。所以我寫了一個新的LockedOpen上下文管理器來替換內建的open上下文。來確保我們實際打開了正確的文件:

代碼如下:
class LockedOpen(object):

    def __init__(self, filename, *args, **kwargs):
        self.filename = filename
        self.open_args = args
        self.open_kwargs = kwargs
        self.fileobj = None

    def __enter__(self):
        f = open(self.filename, *self.open_args, **self.open_kwargs)
        while True:
            fcntl.flock(f, fcntl.LOCK_EX)
            fnew = open(self.filename, *self.open_args, **self.open_kwargs)
            if os.path.sameopenfile(f.fileno(), fnew.fileno()):
                fnew.close()
                break
            else:
                f.close()
                f = fnew
        self.fileobj = f
        return f

    def __exit__(self, _exc_type, _exc_value, _traceback):
        self.fileobj.close()

給追加更新上鎖如同給截斷-寫更新上鎖一樣簡單:需要一個排他鎖,然后追加就完成了。需要長期運行的會將文件長久的打開的進程,可以在更新時釋放鎖,讓其它進入。
spooldir模式有個很優美的性質就是它不需要任何鎖。此外,你建立在使用靈活的命名模式和一個健壯的文件名分代。郵件目錄規范就是一個spooldir模式的好例子。它可以很容易的適應其它情況,不僅僅是處理郵件。
持久性
持久性有點特殊,因為它不僅依賴于應用,也與OS和硬件配置有關。理論上來說,我們可以假定,如果數據沒有到達持久存儲,os.fsync()或os.fdatasync()調用就沒有返回結果。在實際情況中,我們有可能會遇到幾個問題:我們可能會面對不完整的fsync實現,或者糟糕的磁盤控制器配置,它們都無法提供任何持久化的保證。有一個來自 MySQL 開發者 的討論對哪里會發生錯誤進行了詳盡的討論。有些像PostgreSQL 之類的數據庫系統,甚至提供了持久化機制的選擇 ,以便管理員在運行時刻選擇最佳的一個。然而不走運的人只能使用os.fsync(),并期待它可以被正確的實現。
通過截斷-寫模式,在結束寫操作以后關閉文件以前,我們需要發送一個同步信號。注意通常這還牽涉到另一個層次的寫緩存。glibc 緩存 甚至會在寫操作傳遞到內核以前,在進程內部攔住它。同樣為了得到空的glibc緩存,我們需要在同步以前對它flush():

代碼如下:
with open(filename, 'w') as f:
   model.write(f)
   f.flush()
   os.fdatasync(f)

要不,你也可以帶參數-u調用Python,以此為所有的文件I/O獲得未緩沖的寫。
大多數時候相較os.fsync()我更喜歡os.fdatasync(),以此避免同步元數據的更新(所有權、大小、mtime…)。元數據的更新可最終導致磁盤I/O搜索操作,這會使整個過程慢不少。
對寫-替換風格更新使用同樣的技巧只是成功了一半。我們得確保在代替舊文件之前,新寫入文件的內容已經寫入了非易失性存儲器上了,但是替換操作怎么辦?我們不能保證那個目錄更新是否執行的剛剛好。在網絡上有很多關于怎么讓同步目錄更新的長篇大論。但是在我們這種情況,舊文件和新文件都在同一個目錄下,我們可以使用簡單的解決方案來逃避這個這題。

代碼如下:
os.rename(tempname, filename)
dirfd = os.open(os.path.dirname(filename), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)

我們調用底層的os.open()來打開目錄(Python自帶的open()方法不支持打開目錄),然后在目錄文件描述符上執行os.fsync()。
對待追加更新和我以及說過的截斷-寫是相似的。
spooldir模式與寫-替換模式同樣的目錄同步問題。幸運地是,可以使用同樣的解決方案:第一步同步文件,然后同步目錄。

總結

這使可靠的更新文件成為可能。我已經演示了滿足ACID的四大性質。這些展示的實例代碼充當一個工具箱。掌握這編程技術最大的滿足你的需求。有時,你并不需要滿足所有的ACID性質,可能僅僅需要一到兩個。我希望這篇文章可以幫助你去做已充分了解的決定,什么該去實現以及什么該舍棄。

數據分析咨詢請掃描二維碼

若不方便掃碼,搜微信號:CDAshujufenxi

數據分析師資訊
更多

OK
客服在線
立即咨詢
日韩人妻系列无码专区视频,先锋高清无码,无码免费视欧非,国精产品一区一区三区无码
客服在線
立即咨詢