Python将回调函数转为可迭代对象|convert callback into generator in Python

如今各种语法糖越来越方便,以前很多难以直观理解的代码,通过语法糖包装下,变得越来越好用,典型的有 lambda表达式,协程, flow, rx 等等,用起来是不亦乐乎。

很对第三方sdk也逐渐从传统的callbck形式逐渐提供flow形式的接口,让适用方能够比较自然的接入。

比如调用一个方法调用房需要实时知道进度,假如有一个第三方模块提供一个接口如下,可以实时返回进度信息

def third_party_sdk(callback):
    # 模拟第三方SDK的生成
    for i in range(10):
        callback(str(i))

那如果我们需要调用可以如下编写,非常简单,可以在拿到process时实时打印出当前进度

def process():
    def print_process(current:str):
        print(current)
    third_party_sdk(print_process)

if __name__ == "__main__":
     process()

但是这里有一层嵌套的内部函数代码,虽然可以提出去如果想要通过迭代器输出呢?比如我想通过迭代器输出是不是更清晰,比如

if __name__ == "__main__":
    for x in process():
        print(x)

看起来很简单,但是实际上并不容易,因为不能修改调用的第三方函数,一旦调用第三方函数就是会阻塞,就没办法实时生成迭代器的data了。那怎么做呢?可以采用多线程+列队来实现,具体代码如下,希望对你有帮助。

import threading
from queue import Queue
def third_party_sdk(callback):
    # 模拟第三方SDK的生成
    for i in range(10):
        callback(str(i))


def process():
    q = Queue(maxsize=1)
    JOB_DONE = object()
    TIMEOUT = 30
    msgs = []

    def callback(msg):
        # todo 将message 通过yield返回给上层
        # msgs.append(msg)
        q.put(msg)

    def task():
        third_party_sdk(callback)
        q.put(JOB_DONE)

    t = threading.Thread(target=task)
    t.start()
    while True:
        # better set a timeout, or if task in sub-threading fails, it will result in a deadlock
        chunk = q.get(timeout=TIMEOUT)
        if chunk is JOB_DONE:
            break
        yield chunk
        
    t.join()

if __name__ == "__main__":
    for x in process():
        print(x)

虽然看起来多了很多代码,但是总归是能够实现的,再将这种封装抽成方法,那么再以后就能愉快的使用 process返回的迭代器来处理事件啦。

Leave a Reply

Your email address will not be published. Required fields are marked *