# Copyright: Ankitects Pty Ltd and contributors # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html import atexit import html import os import random import subprocess import sys import threading import time from typing import Any, Callable, Dict, List, Optional, Tuple from anki.lang import _ from anki.sound import allSounds from anki.utils import isLin, isMac, isWin, tmpdir from aqt import gui_hooks from aqt.mpv import MPV, MPVBase from aqt.qt import * from aqt.utils import restoreGeom, saveGeom, showWarning def getAudio(parent, encode=True): "Record and return filename" # record first if not Recorder: showWarning("pyaudio not installed") return r = Recorder() mb = QMessageBox(parent) restoreGeom(mb, "audioRecorder") mb.setWindowTitle("Anki") mb.setIconPixmap(QPixmap(":/icons/media-record.png")) but = QPushButton(_("Save")) mb.addButton(but, QMessageBox.AcceptRole) but.setDefault(True) but = QPushButton(_("Cancel")) mb.addButton(but, QMessageBox.RejectRole) mb.setEscapeButton(but) t = time.time() r.start() time.sleep(r.startupDelay) QApplication.instance().processEvents() while not mb.clickedButton(): txt = _("Recording...
Time: %0.1f") mb.setText(txt % (time.time() - t)) mb.show() QApplication.instance().processEvents() if mb.clickedButton() == mb.escapeButton(): r.stop() r.cleanup() return saveGeom(mb, "audioRecorder") # ensure at least a second captured while time.time() - t < 1: time.sleep(0.1) r.stop() # process r.postprocess(encode) return r.file() # Shared utils ########################################################################## def playFromText(text) -> None: for match in allSounds(text): # filename is html encoded match = html.unescape(match) play(match) # Packaged commands ########################################################################## # return modified command array that points to bundled command, and return # required environment def _packagedCmd(cmd) -> Tuple[Any, Dict[str, str]]: cmd = cmd[:] env = os.environ.copy() if "LD_LIBRARY_PATH" in env: del env["LD_LIBRARY_PATH"] if isMac: dir = os.path.dirname(os.path.abspath(__file__)) exeDir = os.path.abspath(dir + "/../../Resources/audio") else: exeDir = os.path.dirname(os.path.abspath(sys.argv[0])) if isWin and not cmd[0].endswith(".exe"): cmd[0] += ".exe" path = os.path.join(exeDir, cmd[0]) if not os.path.exists(path): return cmd, env cmd[0] = path return cmd, env ########################################################################## processingSrc = "rec.wav" processingDst = "rec.mp3" processingChain: List[List[str]] = [] recFiles: List[str] = [] processingChain = [ ["lame", processingSrc, processingDst, "--noreplaygain", "--quiet"], ] # don't show box on windows si: Optional[Any] if sys.platform == "win32": si = subprocess.STARTUPINFO() # pytype: disable=module-attr try: si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # pytype: disable=module-attr except: # pylint: disable=no-member # python2.7+ si.dwFlags |= ( subprocess._subprocess.STARTF_USESHOWWINDOW ) # pytype: disable=module-attr else: si = None def retryWait(proc) -> Any: # osx throws interrupted system call errors frequently while 1: try: return proc.wait() except OSError: continue # MPV ########################################################################## _player: Optional[Callable[[Any], Any]] _queueEraser: Optional[Callable[[], Any]] mpvManager: Optional["MpvManager"] = None mpvPath, mpvEnv = _packagedCmd(["mpv"]) class MpvManager(MPV): executable = mpvPath[0] popenEnv = mpvEnv if not isLin: default_argv = MPVBase.default_argv + [ "--input-media-keys=no", ] def __init__(self) -> None: super().__init__(window_id=None, debug=False) def queueFile(self, file: str) -> None: gui_hooks.mpv_will_play_hook(file) path = os.path.join(os.getcwd(), file) self.command("loadfile", path, "append-play") def clearQueue(self) -> None: self.command("stop") def togglePause(self) -> None: self.set_property("pause", not self.get_property("pause")) def seekRelative(self, secs) -> None: self.command("seek", secs, "relative") def on_idle(self) -> None: gui_hooks.mpv_did_idle_hook() def setMpvConfigBase(base) -> None: mpvConfPath = os.path.join(base, "mpv.conf") MpvManager.default_argv += [ "--no-config", "--include=" + mpvConfPath, ] def setupMPV() -> None: global mpvManager, _player, _queueEraser mpvManager = MpvManager() _player = mpvManager.queueFile _queueEraser = mpvManager.clearQueue atexit.register(cleanupMPV) def cleanupMPV() -> None: global mpvManager, _player, _queueEraser if mpvManager: mpvManager.close() mpvManager = None _player = None _queueEraser = None # Mplayer in slave mode ########################################################################## # if anki crashes, an old mplayer instance may be left lying around, # which prevents renaming or deleting the profile def cleanupOldMplayerProcesses() -> None: # pylint: disable=import-error import psutil # pytype: disable=import-error exeDir = os.path.dirname(os.path.abspath(sys.argv[0])) for proc in psutil.process_iter(): try: info = proc.as_dict(attrs=["pid", "name", "exe"]) if not info["exe"] or info["name"] != "mplayer.exe": continue # not anki's bundled mplayer if os.path.dirname(info["exe"]) != exeDir: continue print("terminating old mplayer process...") proc.kill() except: print("error iterating mplayer processes") mplayerCmd = ["mplayer", "-really-quiet", "-noautosub"] if isWin: mplayerCmd += ["-ao", "win32"] cleanupOldMplayerProcesses() mplayerQueue: List[str] = [] mplayerEvt = threading.Event() mplayerClear = False class MplayerMonitor(threading.Thread): mplayer: Optional[subprocess.Popen] = None deadPlayers: List[subprocess.Popen] = [] def run(self) -> None: global mplayerClear self.mplayer = None self.deadPlayers = [] while 1: mplayerEvt.wait() mplayerEvt.clear() # clearing queue? if mplayerClear and self.mplayer: try: self.mplayer.stdin.write(b"stop\n") self.mplayer.stdin.flush() except: # mplayer quit by user (likely video) self.deadPlayers.append(self.mplayer) self.mplayer = None # loop through files to play while mplayerQueue: # ensure started if not self.mplayer: self.mplayer = self.startProcess() # pop a file try: item = mplayerQueue.pop(0) except IndexError: # queue was cleared by main thread continue if mplayerClear: mplayerClear = False extra = b"" else: extra = b" 1" cmd = b'loadfile "%s"%s\n' % (item.encode("utf8"), extra) try: self.mplayer.stdin.write(cmd) self.mplayer.stdin.flush() except: # mplayer has quit and needs restarting self.deadPlayers.append(self.mplayer) self.mplayer = None self.mplayer = self.startProcess() self.mplayer.stdin.write(cmd) self.mplayer.stdin.flush() # if we feed mplayer too fast it loses files time.sleep(1) # wait() on finished processes. we don't want to block on the # wait, so we keep trying each time we're reactivated def clean(pl): if pl.poll() is not None: pl.wait() return False else: return True self.deadPlayers = [pl for pl in self.deadPlayers if clean(pl)] def kill(self) -> None: if not self.mplayer: return try: self.mplayer.stdin.write(b"quit\n") self.mplayer.stdin.flush() self.deadPlayers.append(self.mplayer) except: pass self.mplayer = None def startProcess(self) -> subprocess.Popen: try: cmd = mplayerCmd + ["-slave", "-idle"] cmd, env = _packagedCmd(cmd) return subprocess.Popen( cmd, startupinfo=si, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=env, ) except OSError: mplayerEvt.clear() raise Exception("Did you install mplayer?") mplayerManager: Optional[MplayerMonitor] = None def queueMplayer(path) -> None: ensureMplayerThreads() if isWin and os.path.exists(path): # mplayer on windows doesn't like the encoding, so we create a # temporary file instead. oddly, foreign characters in the dirname # don't seem to matter. dir = tmpdir() name = os.path.join( dir, "audio%s%s" % (random.randrange(0, 1000000), os.path.splitext(path)[1]) ) f = open(name, "wb") f.write(open(path, "rb").read()) f.close() # it wants unix paths, too! path = name.replace("\\", "/") mplayerQueue.append(path) mplayerEvt.set() def clearMplayerQueue() -> None: global mplayerClear, mplayerQueue mplayerQueue = [] mplayerClear = True mplayerEvt.set() def ensureMplayerThreads() -> None: global mplayerManager if not mplayerManager: mplayerManager = MplayerMonitor() mplayerManager.daemon = True mplayerManager.start() # ensure the tmpdir() exit handler is registered first so it runs # after the mplayer exit tmpdir() # clean up mplayer on exit atexit.register(stopMplayer) def stopMplayer(*args) -> None: if not mplayerManager: return mplayerManager.kill() if isWin: cleanupOldMplayerProcesses() gui_hooks.profile_will_close_hook.append(stopMplayer) # PyAudio recording ########################################################################## try: import pyaudio import wave PYAU_FORMAT = pyaudio.paInt16 PYAU_CHANNELS = 1 PYAU_INPUT_INDEX: Optional[int] = None except: pyaudio = None class _Recorder: def postprocess(self, encode=True) -> None: self.encode = encode for c in processingChain: # print c if not self.encode and c[0] == "lame": continue try: cmd, env = _packagedCmd(c) ret = retryWait(subprocess.Popen(cmd, startupinfo=si, env=env)) except: ret = True finally: self.cleanup() if ret: raise Exception(_("Error running %s") % " ".join(cmd)) def cleanup(self) -> None: if os.path.exists(processingSrc): os.unlink(processingSrc) class PyAudioThreadedRecorder(threading.Thread): def __init__(self, startupDelay) -> None: threading.Thread.__init__(self) self.startupDelay = startupDelay self.finish = False def run(self) -> Any: chunk = 1024 p = pyaudio.PyAudio() rate = int(p.get_default_input_device_info()["defaultSampleRate"]) wait = int(rate * self.startupDelay) stream = p.open( format=PYAU_FORMAT, channels=PYAU_CHANNELS, rate=rate, input=True, input_device_index=PYAU_INPUT_INDEX, frames_per_buffer=chunk, ) stream.read(wait) data = b"" while not self.finish: data += stream.read(chunk, exception_on_overflow=False) stream.close() p.terminate() wf = wave.open(processingSrc, "wb") wf.setnchannels(PYAU_CHANNELS) wf.setsampwidth(p.get_sample_size(PYAU_FORMAT)) wf.setframerate(rate) wf.writeframes(data) wf.close() class PyAudioRecorder(_Recorder): # discard first 250ms which may have pops/cracks startupDelay = 0.25 def __init__(self): for t in recFiles + [processingSrc, processingDst]: try: os.unlink(t) except OSError: pass self.encode = False def start(self): self.thread = PyAudioThreadedRecorder(startupDelay=self.startupDelay) self.thread.start() def stop(self): self.thread.finish = True self.thread.join() def file(self): if self.encode: tgt = "rec%d.mp3" % time.time() os.rename(processingDst, tgt) return tgt else: return processingSrc if not pyaudio: PyAudioRecorder = None # type: ignore # Audio interface ########################################################################## _player = queueMplayer _queueEraser = clearMplayerQueue def play(path) -> None: _player(path) def clearAudioQueue() -> None: _queueEraser() Recorder = PyAudioRecorder if not Recorder: print("pyaudio not installed") # add everything from this module into anki.sound for backwards compat _exports = [i for i in locals().items() if not i[0].startswith("__")] for (k, v) in _exports: sys.modules["anki.sound"].__dict__[k] = v