# !/usr/bin/env/python3 # _*_coding:utf-8_*_ # @__Data__:2022-10-18 # @__Auther__:苏莫 # @__PythonVersion__:python3 # @__name__:malina.py import os import json import queue import requests import threading from urllib import parse from bs4 import BeautifulSoup from fake_useragent import UserAgent class Malina(object): """docstring for ASMRbfy""" def __init__(self, search:str, url="https://media.mp3-malina.me/tracks", thread_num=10): super(ASMRbfy, self).__init__() if url.endswith("/"): url = url + parse.quote(search) else: url = url + "/" + parse.quote(search) self.url = url self.save_path = search self.asmr = [] self.thread_num = thread_num self.q = queue.Queue() self.num = 1 self.ua = UserAgent() # 获取下载名称及链接地址 def getMP4List(self, content:str): soup = BeautifulSoup(content, "html.parser") all_href_tags = soup.find_all("a",class_="link") all_title_tags = soup.find_all("div",class_="title") for href_tag, title_tag in zip(all_href_tags, all_title_tags): result = {
} result["name"] = title_tag.get_text() result["url"] = "https:" + href_tag.attrs['href'] self.q.put(result) self.asmr.append(result) # 获取页数 def getNum(self, content:str): soup = BeautifulSoup(content, "html.parser") num_tags = soup.find_all("div",class_="inactive-p") num_str = num_tags[-1].get_text() self.num = int(num_str) + 1 # 处理所有数据 def getUrl(self): Headers = {
"user-Agent": self.ua.random} response = requests.get(self.url, headers=Headers) self.getNum(response.text) self.getMP4List(response.text) for n in range(2, self.num): url = self.url + "/" + str(n) response = requests.get(url, headers=Headers) self.getMP4List(response.text) self.output() # 数据输出 def output(self): # 确保保存目录存在 if not os.path.exists(self.save_path): try: os.mkdir(self.save_path) except Exception as e: os.makedirs(self.save_path) ''' # 单线程 # 信息写入 for i in self.asmr: try: # 保存音频 mp3_path = os.path.join(self.save_path, i["name"]+".mp3") Headers = {"user-Agent": self.ua.random} req = requests.get(i["url"], headers=Headers, stream=True) with open(mp3_path, "wb") as f: for chunk in req.iter_content(chunk_size=1024): if chunk: f.write(chunk) f.flush() # break except Exception as e: print("[error] %s" % e) if os.path.exists(mp3_path): os.remove(mp3_path) continue ''' # 多线程 threads = [] for i in range(self.thread_num): # 创建新线程 t = threading.Thread(target=self.save_asmr) # 添加线程到线程列表 threads.append(t) for i in range(self.thread_num): # 开启新线程 threads[i].start() # 等待所有线程完成 for i in range(self.thread_num): threads[i].join() # 信息写入到文本 # with open(self.save_path+".txt","w") as f: # f.write(json.dumps(self.asmr, ensure_ascii=False)) # 保存音频信息 def save_asmr(self): while True: if self.q.empty(): return else: try: asmr_msg = self.q.get() # 保存音频 mp3_path = os.path.join(self.save_path, asmr_msg["name"]+".mp3") Headers = {
"user-Agent": self.ua.random} print("[*] 正在下载《%s》" % asmr_msg["name"]) # 大文件保持连接下载 req = requests.get(asmr_msg["url"], headers=Headers, stream=True) with open(mp3_path, "wb") as f: for chunk in req.iter_content(chunk_size=1024): if chunk: f.write(chunk) f.flush() except Exception as e: print("[error] %s" % e) if os.path.exists(mp3_path): os.remove(mp3_path) if __name__ == '__main__': asmr = Malina("family affair") # asmr = Malina("family affair", thread_num=20) asmr.getUrl()
讯享网
多线程爬取音频,根据个人网速,调整线程数量。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/30304.html