线程进程计算之多任务同步进行

hi各位大佬好,前面一篇介绍了多进程中传值的问题,这里要进行一个线程问题,即,当目前任务进行中一部分数据要用来做另一个任务(new plan),当前任务又不能停下,于是就要开一个线程执行新的任务。

For Video Recommendation in Deep learning QQ Group 277356808

For Visual in deep learning QQ Group 629530787

I’m here waiting for you

别加那么多,没必要,另外,不接受这个网页的私聊/私信!!!

 与其说是随手笔记,不如说是工作记录更恰当。今天本大佬节日,不知道有没有一起过的。韶华不为少年留,恨悠悠,几时休,便作春江都是泪,流不尽,许多愁。

【注,我的很多博文并不都是当天完成的,有的都是几天前就开始写了,只是发表那天提交,当然同时可能开了很多博文】

我想说,线程里面能开进程吗??不是Process这种,是Pool或者ThreadPool,可以试试。先实现demo

from threading import Thread
from multiprocessing.pool import ThreadPool
from multiprocessing import Process,Pool

有计数的快慢的问题,我先尝试下,看是手写的快还是熊猫库快。没想到还是我手写的快啊,小明哥厉害了。但是数据长度上来了还是熊猫库厉害啊,均是执行10次的累计时间,第一个是我手写的,第二个是熊猫库。鉴于我召回的items的量级,所以采用我自己写的没错了。

#data length 1000
time 1 : 0.007994,  time 2 : 0.029980

#10000
time 1 : 0.079951,  time 2 : 0.033979

#100000
time 1 : 0.959261,  time 2 : 0.067959

#百万
time 1 : 8.516236,  time 2 : 0.410749

鉴于时间要求,今天暂不进行线程中加多进程,先搞个线程整上。

推进中发现线程是变量进不去,,,,,尴尬,之前的进程是变量出不去(上个博文),解决红字问题,那就是传参,这个很容易,进不去是不想传参,有点懒。

今天不搞个博文都对不起粉丝。。。。明天更这个博文,不要说脏话,文明人。

拜拜

【20201112补充】

鉴于数据计算量大,只有一个线程根本不行,数据直接淹没了,这个线程是否活着也不得而知(因为主进程结束任务后,该线程的过程日志也不会打印在log上)。准备搞16个进程试试看看。昨天的demo线程代码在此

    k=12
    p=11
    def update_model(k,p):
        k+=1
        print("p: %d, k: %d\n"%(p,k))    


    thread1=Thread(target= update_model,args=(k,p,))          
    thread1.setDaemon(True)
    thread1.start()

    print("out: %d\n"%k)

按照上面的修改后直接报错,这是线程中的经典错误。这就是说我昨天的程序根本就没跑起来啊。除却五侯歌舞地,人间何处不相随。

Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads

Thread 0x00007ff69068b700 (most recent call first):
  File "thread_pool_.py", line 92 in update_model
  File "/data/logs/xulm1/myconda/lib/python3.7/threading.py", line 870 in run
  File "/data/logs/xulm1/myconda/lib/python3.7/threading.py", line 926 in _bootstrap_inner
  File "/data/logs/xulm1/myconda/lib/python3.7/threading.py", line 890 in _bootstrap

Current thread 0x00007ff6a9243700 (most recent call first):
已放弃

一个博主说,将此线程函数写一个文件,也就是说线程只需要能执行那个文件即可,不知道能不能行,试试

   thread1.start()
   thread1.join()

改成这种是不行的,我还以为在执行了,top命令发现有python3的程序,后来发现是之前别人的服务,我都想给他kill了,下面是查运行的程序的主要方法

$ ps aux | grep python

我先试试直接运行函数可以不,直接运行32ThreadPool,10万用户要2min,改下其他参数看影响大不大,16个反而时间更少了。。。。吃惊,后来发现大于16就没啥用了

修改其他参数影响不大,增加用户数为100万则时间成倍增加,近20min

下面要采用别人的方法要面临一个问题,如何给文件传参数,argparse/sys的格式能不能行?这种也就传个字符串,数字吧,没见过传一个大的数组,下面是常见两种方式,还有其他方式多了,

os.system("python3 main.py --task train")
os.popen(cmd)

这种是不行的,所以说,那个博主的原本意思是在其他py文件中写个函数,然后导入,这样就不会出上面的“已放弃”错误了(这个已放弃不是我加的)

下面再次直接试试,因为我加了global变量,为了能够看到效果,因为即使打印运行过程,线程中的在主程序结束后的日志是无法打印在日志中的,所以我将处理结果存储在redis集群,2h有效期,当然也可为了测试考虑,整个time.sleep,这样打印的日志能看到。

增加global后没有报错,现在就是等一个小时,趁此时间搞下GCEGNN,这个issue还是没解决啊,欲说还休,欲说还休,却道天凉好个秋。

事实证明,添加sleep是可以看到打印的日志,而且也已经存上redis了(如下),但我还是想证明下,如果主程序结束了,这个线程会不会活着?

  1) "173"
  2) "214"
  3) "244"
  4) "282"
  5) "298"
  6) "331"
  7) "506"
  8) "680"
  9) "765"
 10) "779"
 11) "784"
 12) "810"
 13) "816"
 14) "995"
 15) "1050"

服务器只有用了才是自己的,赶紧搞起来啊!!不然服务器都没了。都是共同申请的,啥时候有我专用的呢?GNN与DGCF同时进行,后者还没看paper,who care?

主程序运行完后,线程在top及我上面提及的方法中查询不到是否在运行,只能通过查看redis是否存储了。

然而过了近两个小时,我发现redis没有存储啊。。。。而且,主程序结束时也没有那么大的内存,这就是说明,主程序结束,线程就死了??下面拿小demo做试验证实或证伪(那就需要改程序了)

def update_model():
    results=[[0,[1,2,3]],[1,[2,3,4]]]#redis保存这个

    #redis写入省略
    。。。

#线程同最上,后面加
time.sleep(100)
#保证能存上

修改为如下,程序结束等了三分钟也没存入redis,说明程序真的死了,证明了假设,红字假设也是网上常见的问题。

def update_model():
    results=[[0,[1,2,3]],[1,[2,3,4]]]#redis保存这个
    time.sleep(100)
    #redis写入省略
    。。。

#线程同最上,后面加
time.sleep(1)
#保证能存上

既然搞个线程不能解决,那就来两个进程可好,同时进行,也就是说添加了一个进程

直接加个Process即可解决,也就是说原来的程序可以不变,加个Process能跳过吗???(像线程那样,能够同时进行吗?)证实可以跳过,可在start后加个标志打印下即可,如下示例:进程代码参考上一篇(不要join),不再赘述

    print("main")
    #time.sleep(10)
    #。。进程
    print("main 2")
    p.start()
    print("main 3")

#打印结果
main
main 2
main 3
starting @ 2020-11-12 19:37:58.584438s

bk=1
saving time 10.072225
saving finished @ 2020-11-12 19:38:08.656873s

#后面的是update_model中的打印日志,说明跳过了start

今天任务完成了,这是完整版的,希望粉丝多多捧场。

来何容易去何迟,半在心头半在眉。

 

【20201113补充】

昨天的没有执行好,因为熊猫库处理太慢了,整成字典很快。给占用GPU的程序加了进程,只是start,后来发现不行,要有join,难道说没有join,整个流程结束了也不会终止吗?下面测试demo(参考上一篇)

欲上高楼去避愁,愁还随我上高楼。

Process MyProcess-1:
Traceback (most recent call last):
  File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "test_update_rec_items.py", line 405, in run
    next_task = self.task_queue.get()
  File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/data/logs/xulm1/myconda/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt

强行终止后发现这种,于是增加task为None放最后,不然while死循环了【不要问我在说啥子,真正了解了就不会这么问了,不懂就自己实践下】

再等1h,顺便搞下GNN的破事,

搞定了,拜拜。

 

谁念西风独自凉,萧萧黄叶闭疏窗。

本文地址:https://blog.csdn.net/SPESEG/article/details/109601661

(0)
上一篇 2022年3月22日
下一篇 2022年3月22日

相关推荐