python实现语音识别前期准备(调用科大讯飞平台)

Rosalia ·
更新时间:2024-11-10
· 752 次阅读

本demo测试时运行的环境为:Windows + Python3.7
本demo测试成功运行时所安装的第三方库及其版本如下,您可自行逐一或者复制到一个新的txt文件利用pip一次性安装:
cffi1.12.3
gevent
1.4.0
greenlet0.4.15
pycparser
2.19
six1.12.0
websocket
0.2.1
websocket-client==0.56.0
语音听写流式 WebAPI 接口调用示例 接口文档(必看):https://doc.xfyun.cn/rest_api/语音听写(流式版).html
webapi 听写服务参考帖子(必看):http://bbs.xfyun.cn/forum.php?mod=viewthread&tid=38947&extra=
语音听写流式WebAPI 服务,热词使用方式:登陆开放平台https://www.xfyun.cn/后,找到控制台–我的应用—语音听写(流式)—服务管理–个性化热词,
设置热词
注意:热词只能在识别的时候会增加热词的识别权重,需要注意的是增加相应词条的识别率,但并不是绝对的,具体效果以您测试为准。
语音听写流式WebAPI 服务,方言试用方法:登陆开放平台https://www.xfyun.cn/后,找到控制台–我的应用—语音听写(流式)—服务管理–识别语种列表
可添加语种或方言,添加后会显示该方言的参数值
错误码链接:https://www.xfyun.cn/document/error-code (code返回错误码时必看)
注释:下载以上这些,我们可以使用清华源下载会比较快.
链接:https://www.cnblogs.com/sky-ai/p/9800036.html

1.装完这些以后,下载demo,本人下载的是语音转写,大家可以在网址中进行选择并下载。

2.在做下面步骤之前,大家可以先看8.(在下方)

3.由于语音识别的文件有多个,我们需要将其转化为与之对应的文本,将其保留在.csv文件中
代码如下:

# -*- coding: utf-8 -* # # 非实时转写调用demo import base64 import hashlib import hmac import json import os import time import csv import pandas as pd import requests #import datetimepi import json i=1 lfasr_host = 'http://raasr.xfyun.cn/api' # 请求的接口名 api_prepare = '/prepare' api_upload = '/upload' api_merge = '/merge' api_get_progress = '/getProgress' api_get_result = '/getResult' # 文件分片大小10M file_piece_sice = 10485760 # ——————————————————转写可配置参数———————————————— # 参数可在官网界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可自行在gene_params方法里添加修改 # 转写类型 lfasr_type = 0 # 是否开启分词 has_participle = 'false' has_seperate = 'true' # 多候选词个数 max_alternatives = 0 # 子用户标识 suid = '' class SliceIdGenerator: """slice id生成器""" def __init__(self): self.__ch = 'aaaaaaaaa`' def getNextSliceId(self): ch = self.__ch j = len(ch) - 1 while j >= 0: cj = ch[j] if cj != 'z': ch = ch[:j] + chr(ord(cj) + 1) + ch[j + 1:] break else: ch = ch[:j] + 'a' + ch[j + 1:] j = j - 1 self.__ch = ch return self.__ch class RequestApi(object): def __init__(self, appid, secret_key, upload_file_path): self.appid = appid self.secret_key = secret_key self.upload_file_path = upload_file_path # 根据不同的apiname生成不同的参数,本示例中未使用全部参数您可在官网(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后选择适合业务场景的进行更换 def gene_params(self, apiname, taskid=None, slice_id=None): appid = self.appid secret_key = self.secret_key upload_file_path = self.upload_file_path ts = str(int(time.time())) m2 = hashlib.md5() m2.update((appid + ts).encode('utf-8')) md5 = m2.hexdigest() md5 = bytes(md5, encoding='utf-8') # 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa signa = hmac.new(secret_key.encode('utf-8'), md5, hashlib.sha1).digest() signa = base64.b64encode(signa) signa = str(signa, 'utf-8') file_len = os.path.getsize(upload_file_path) file_name = os.path.basename(upload_file_path) param_dict = {} if apiname == api_prepare: # slice_num是指分片数量,如果您使用的音频都是较短音频也可以不分片,直接将slice_num指定为1即可 slice_num = int(file_len / file_piece_sice) + (0 if (file_len % file_piece_sice == 0) else 1) param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['file_len'] = str(file_len) param_dict['file_name'] = file_name param_dict['slice_num'] = str(slice_num) param_dict['speaker_number'] = '2' param_dict['has_seperate'] = 'true' elif apiname == api_upload: param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['task_id'] = taskid param_dict['slice_id'] = slice_id elif apiname == api_merge: param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['task_id'] = taskid param_dict['file_name'] = file_name elif apiname == api_get_progress or apiname == api_get_result: param_dict['app_id'] = appid param_dict['signa'] = signa param_dict['ts'] = ts param_dict['task_id'] = taskid return param_dict # 请求和结果解析,结果中各个字段的含义可参考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html def gene_request(self, apiname, data, files=None, headers=None): response = requests.post(lfasr_host + apiname, data=data, files=files, headers=headers) result = json.loads(response.text) if result["ok"] == 0: print("{} success:".format(apiname) + str(result)) return result else: print("{} error:".format(apiname) + str(result)) exit(0) return result # 预处理 def prepare_request(self): return self.gene_request(apiname=api_prepare, data=self.gene_params(api_prepare)) # 上传 def upload_request(self, taskid, upload_file_path): file_object = open(upload_file_path, 'rb') try: index = 1 sig = SliceIdGenerator() while True: content = file_object.read(file_piece_sice) if not content or len(content) == 0: break files = { "filename": self.gene_params(api_upload).get("slice_id"), "content": content } response = self.gene_request(api_upload, data=self.gene_params(api_upload, taskid=taskid, slice_id=sig.getNextSliceId()), files=files) if response.get('ok') != 0: # 上传分片失败 print('upload slice fail, response: ' + str(response)) return False print('upload slice ' + str(index) + ' success') index += 1 finally: 'file index:' + str(file_object.tell()) file_object.close() return True # 合并 def merge_request(self, taskid): return self.gene_request(api_merge, data=self.gene_params(api_merge, taskid=taskid)) # 获取进度 def get_progress_request(self, taskid): return self.gene_request(api_get_progress, data=self.gene_params(api_get_progress, taskid=taskid)) # 获取结果 def get_result_request(self, taskid): return self.gene_request(api_get_result, data=self.gene_params(api_get_result, taskid=taskid)) def all_api_request(self): # 1. 预处理 pre_result = self.prepare_request() taskid = pre_result["data"] # 2 . 分片上传 self.upload_request(taskid=taskid, upload_file_path=self.upload_file_path) # 3 . 文件合并 self.merge_request(taskid=taskid) # 4 . 获取任务进度 while True: # 每隔20秒获取一次任务进度 progress = self.get_progress_request(taskid) progress_dic = progress if progress_dic['err_no'] != 0 and progress_dic['err_no'] != 26605: print('task error: ' + progress_dic['failed']) return else: data = progress_dic['data'] task_status = json.loads(data) if task_status['status'] == 9: print('task ' + taskid + ' finished') break print('The task ' + taskid + ' is in processing, task status: ' + str(data)) # 每次获取进度间隔20S time.sleep(20) # 5 . 获取结果 self.get_result_request(taskid=taskid) # 注意:如果出现requests模块报错:"NoneType" object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0) # 输入讯飞开放平台的appid,secret_key和待转写的文件路径 if __name__ == '__main__': #while i<=60: api = RequestApi(appid="5e61b3e9", secret_key="f56b43ae7d48267ba7c5db55f595de5b", upload_file_path=r'D:/明德客服记事录音文件/明德客服记事录音文件/60.wav') #str(i) api.get_result_request('2131ae7c6af34b568b1f7cd156c44e33') #'6f0bc5df8cd54c65a8e4fa79d7bf9df4' list1 = eval(api.get_result_request('2131ae7c6af34b568b1f7cd156c44e33').get('data')) print(type(list1)) print(list1) name = ['bg','ed','onebest','speaker'] test = pd.DataFrame(columns=name,data=list1) print(test) test.to_csv(r'D:\\speech recognition\\识别结果\\dict60.csv',encoding='gbk') #f = open(r'C:\\Users\\user20\\Desktop\\dict.csv', 'w') #w = csv.DictWriter(f,api.get_result_request('2e595d5b1192427893c1f8cacb42c519').keys()) #w.writerow(api.get_result_request('2e595d5b1192427893c1f8cacb42c519')) #f.close() #i+=1 #api.all_api_request()

4.其中appid,secrect_key都在平台中获取。

5.由于要识别的音频文件不只一个,因此我们先运行while循环语句将60替换为str(i),以字符链接的方式相加,由于python对格式要求较为严格,因此要注意格式对齐!!!
6.将其他注释掉,就下while循环语句+api.all_api_request()来获取每个音频的id每运行出现一个id,将它复制黏贴到记事本里(或者其他可以添加文本的文件),本人添加在记事本中:
在这里插入图片描述
7.最后就要开始识别文字了,将每个音频的地址输入到upload_file_path中,随后再将与之对应的id复制到api_get_result_request中,在上述代码中有两处需要修改,每执行一次,就要修改一次,切记文件名和id要相对应(!!!)
8.在运行这些代码之前,最好先将音频转化为.wav格式,可以运用格式工厂,进行相应的转化(!!!)
格式工厂下载网址:
http://www.pcfreetime.com/formatfactory/CN/index.html
9.最后运行python代码即可.


作者:L C H



讯飞 调用 科大讯飞 语音识别 Python

需要 登录 后方可回复, 如果你还没有账号请 注册新账号