RawPCMStreamNode (StreamNode, or InputStreamNode) #97
Description
Describe the feature
const ac = new AudioContext();
const aw = new AudioWorkletNode(ac, 'processor');
const {body: readable} = await fetch('/path/to/raw_pcm_output');
const rawPCMStreamNode = new RawPCMStreamNode(ac, readable);
aw.connect(rawPCMStreamNode);
Is there a prototype?
Yes. Rather involved, illustrating some of the deficiencies of SharedArrayBuffer
and even dynamic SharedArrayBuffer
memory growth using WebAssembly.Memory.prototype.grow()
, thus this feature request to simplify the process.
From wasmerio/wasmer-php#121 (comment)
Put together a minimal, verifiable, complete working example of streaming from
fetch()
toAudioWorkletProcessor
via Native Messaging, including the observable restriction which lead to filing this issue, and those issues referenced above, specifically,RangeError
being thrown, and limitations on the amount of memory that can be pre-allocated at anArrayBuffer
orSharedArrayBuffer
.Posting the code here as not infrequently crash the browser and, or the OS in the midst of ongoing experiments.
Flow chart
- Create Native Messaging Chromium app (app)
- At an arbitrary web page ynamically set permissions for the app using Native File System to re-write manifest.json in app/ folder by setting the property
externally_connectable
value- Connect to the backgound script, background.js, that passes messages between the Native Messaging host (in this case we use bash and execute PHP within the shell script) and the arbitrary web page
- We do not use the Native Messaging protocol to pass STDOUT to background.js then to the web page, instead we begin PHP built-in server process, localhost, to handle the subsequent call to
fetch()
withAbortController
set within object passed as send parameter where we read STDOUT as aReadableStream
, converting to and fromJSON
andUint8Array
- Create a
MediaStreamAudioDestinationNode
for the ability to record, perform other tasks withMediaStream
, andMediaStreamTrack
of live audio stream of system audio capture- Use Transferable Streams to post
response.body
(ReadableStream
) toAudioWorkletProcessor
, where we pipe the stream toWritableStream
and writeUint8Array
s to a singleUint8Array
pre-allocated to a set amount (344 * 512 * 60 * 60
throws cannot allocate memory, here)- Store minimal data before
process()
is started by resumingAudioContext
- Pass message to background.js to kill PHP built-in server process
- Close
ReadableStreamDefaultController
Chromium Native Messaging extenstion.
app/
manifest.json
{ // Extension ID: <id> "key": "<key>", "name": "native_messaging_stream", "version": "1.0", "manifest_version": 2, "description": "Capture system audio", "icons": {}, "permissions": [ "nativeMessaging", "*://localhost/" ], "app": { "background": { "scripts": [ "background.js" ], "persistent": false } }, "externally_connectable": { "matches": [ "https://developer.mozilla.org/*", "http://localhost/*" ], "ids": [ "lmkllhfhgnmbcmhdagfhejkpicehnida" ] }, "author": "guest271314" }
background.js
const id = 'native_messaging_stream'; let externalPort, controller, signal; chrome.runtime.onConnectExternal.addListener(port => { console.log(port); externalPort = port; externalPort.onMessage.addListener(message => { if (message === 'start') { chrome.runtime.sendNativeMessage(id, {}, async _ => { console.log(_); if (chrome.runtime.lastError) { console.warn(chrome.runtime.lastError.message); } controller = new AbortController(); signal = controller.signal; // wait until bash script completes, server starts for await (const _ of (async function* stream() { while (true) { try { if ((await fetch('http://localhost:8000', { method: 'HEAD' })).ok) break; } catch (e) { console.warn(e.message); yield; } } })()); try { const response = await fetch('http://localhost:8000?start=true', { cache: 'no-store', mode: 'cors', method: 'get', signal }); console.log(...response.headers); const readable = response.body; readable .pipeTo( new WritableStream({ write: async value => { // value is a Uint8Array, postMessage() here only supports cloning, not transfer externalPort.postMessage(JSON.stringify(value)); }, }) ) .catch(err => { console.warn(err); externalPort.postMessage('done'); }); } catch (err) { console.error(err); } }); } if (message === 'stop') { controller.abort(); chrome.runtime.sendNativeMessage(id, {}, _ => { if (chrome.runtime.lastError) { console.warn(chrome.runtime.lastError.message); } console.log('everything should be done'); }); } }); });
set_externally_connectable.js
(async(set_externally_connectable = ["https://example.com/*"], unset_externally_connectable = true) => { const dir = await self.showDirectoryPicker(); const status = await dir.requestPermission({writable: true}); const fileHandle = await dir.getFileHandle("manifest.json", {create: false}); const file = await fileHandle.getFile(); const manifest_text = await file.text(); const match_extension_id = /\/\/ Extension ID: \w{32}/; const [extension_id] = manifest_text.match(match_extension_id); let text = manifest_text.replace(match_extension_id, `"_": 0,`); const manifest_json = JSON.parse(text); manifest_json.externally_connectable.matches = unset_externally_connectable ? set_externally_connectable : [...manifest_json.externally_connectable.matches, ...set_externally_connectable]; const writer = await fileHandle.createWritable({keepExistingData:false}); await writer.write(JSON.stringify(manifest_json, null, 2).replace(/"_": 0,/, extension_id)); return await writer.close(); })([`${location.origin}/*`]);
host/
native_messaging_stream.json (
$ cp native_messaging_stream.json ~/.config/chromium/NativeMessagingHosts
){ "name": "native_messaging_file_stream", "description": "Capture system audio", "path": "/path/to/host/captureSystemAudio.sh", "type": "stdio", "allowed_origins": [ "chrome-extension://<id>/" ], "author": "guest271314" }
index.php
<?php if($_SERVER['REQUEST_METHOD'] == 'HEAD') { header('Vary: Origin'); header("Access-Control-Allow-Origin: chrome-extension://lmkllhfhgnmbcmhdagfhejkpicehnida"); header("Access-Control-Allow-Methods: GET, OPTIONS, HEADERS"); header("Access-Control-Allow-Headers: Content-Type, Access-Control-Allow-Headers"); header("X-Powered-By:"); header("HTTP/1.1 200 OK"); die(); } if (isset($_GET["start"])) { header('Vary: Origin'); header("Access-Control-Allow-Origin: chrome-extension://lmkllhfhgnmbcmhdagfhejkpicehnida"); header("Access-Control-Allow-Methods: GET, OPTIONS, HEAD"); header("Access-Control-Allow-Headers: Content-Type, Access-Control-Allow-Headers"); header("Content-Type: text/plain"); header("X-Powered-By:"); echo passthru("parec -v --raw -d alsa_output.pci-0000_00_1b.0.analog-stereo.monitor"); exit(); }
captureSystemAudio.sh
#!/bin/bash sendMessage() { # https://stackoverflow.com/a/24777120 message='{"message": "ok"}' # Calculate the byte size of the string. # NOTE: This assumes that byte length is identical to the string length! # Do not use multibyte (unicode) characters, escape them instead, e.g. # message='"Some unicode character:\u1234"' messagelen=${#message} # Convert to an integer in native byte order. # If you see an error message in Chrome's stdout with # "Native Messaging host tried sending a message that is ... bytes long.", # then just swap the order, i.e. messagelen1 <-> messagelen4 and # messagelen2 <-> messagelen3 messagelen1=$(( ($messagelen ) & 0xFF )) messagelen2=$(( ($messagelen >> 8) & 0xFF )) messagelen3=$(( ($messagelen >> 16) & 0xFF )) messagelen4=$(( ($messagelen >> 24) & 0xFF )) # Print the message byte length followed by the actual message. printf "$(printf '\\x%x\\x%x\\x%x\\x%x' \ $messagelen1 $messagelpen2 $messagelen3 $messagelen4)%s" "$message" } captureSystemAudio() { if pgrep -f php > /dev/null; then killall -9 php & sendMessage else php -S localhost:8000 -t /path/to/host/ & sendMessage fi } captureSystemAudio
at the arbitrary web page, in this case, MDN, where we previously set the matching URL pattern at
externally_connectable
for permission to communicate with the Chromium Native Messaging applicationAt web page
var id = 'lmkllhfhgnmbcmhdagfhejkpicehnida'; var port = chrome.runtime.connect(id); var controller; var readable = new ReadableStream({ start(c) { return (controller = c); }, }); var init = false; port.onMessage.addListener(async message => { if (message !== 'done') { if (!init) { init = true; class AudioWorkletProcessor {} class AudioWorkletNativeMessageStream extends AudioWorkletProcessor { constructor(options) { super(options); this.byteSize = 512 * 384 * 60 * 1; // 5 minutes of data this.memory = new Uint8Array(this.byteSize); // TODO: grow memory, dynamically Object.assign(this, options.processorOptions); this.port.onmessage = this.appendBuffers.bind(this); } async appendBuffers({ data: readable }) { Object.assign(this, { readable }); const source = { write: (value, controller) => { console.log(globalThis.currentTime % 3); if (this.totalBytes + value.byteLength < this.byteSize) { this.memory.set(value, this.readOffset); this.readOffset = this.readOffset + value.buffer.byteLength; this.totalBytes = this.readOffset; } else { const last = value.subarray(0, this.byteSize - this.totalBytes); this.memory.set(last, this.readOffset); this.readOffset = this.readOffset + last.length; this.totalBytes = this.readOffset; console.log( value.buffer.byteLength, this.readOffset, this.totalBytes ); controller.close(); } // await 250 milliseconds of audio data if (this.totalBytes >= (512 * 384) / 4 && !this.started) { this.started = true; this.port.postMessage({ started: this.started }); } }, close() { console.log('stopped'); }, }; try { await this.readable.pipeTo(new WritableStream(source)); } catch (e) { console.warn(e); console.log(this.writeOffset, this.totalBytes); this.endOfStream(); } } endOfStream() { this.port.postMessage({ ended: true, currentTime, currentFrame, readOffset: this.readOffset, writeOffset: this.writeOffet, totalBytes: this.totalBytes, }); } process(inputs, outputs) { const channels = outputs.flat(); if ( this.writeOffset >= this.totalBytes || this.totalBytes === this.byteSize ) { console.log(this); this.endOfStream(); return false; } const uint8 = new Uint8Array(512); try { for (let i = 0; i < 512; i++, this.writeOffset++) { if (this.writeOffset === this.byteSize) { break; } uint8[i] = this.memory[this.writeOffset]; } const uint16 = new Uint16Array(uint8.buffer); // https://stackoverflow.com/a/35248852 for (let i = 0, j = 0, n = 1; i < uint16.length; i++) { const int = uint16[i]; // If the high bit is on, then it is a negative number, and actually counts backwards. const float = int >= 0x8000 ? -(0x10000 - int) / 0x8000 : int / 0x7fff; // interleave channels[(n = ++n % 2)][!n ? j++ : j - 1] = float; } // console.log(channels[0]); } catch (e) { console.error(e); } return true; } } // register processor in AudioWorkletGlobalScope function registerProcessor(name, processorCtor) { return `${processorCtor};\nregisterProcessor('${name}', ${processorCtor.name});`; } const worklet = URL.createObjectURL( new Blob( [ registerProcessor( 'audio-worklet-native-message-stream', AudioWorkletNativeMessageStream ), ], { type: 'text/javascript' } ) ); const ac = new AudioContext({ latencyHint: 1, sampleRate: 44100, numberOfChannels: 2, }); ac.onstatechange = e => console.log(ac.state); if (ac.state === 'running') { await ac.suspend(); } await ac.audioWorklet.addModule(worklet); const aw = new AudioWorkletNode(ac, 'audio-worklet-native-message-stream', { numberOfInputs: 1, numberOfOutputs: 2, channelCount: 2, processorOptions: { totalBytes: 0, readOffset: 0, writeOffset: 0, done: false, ended: false, started: false, }, }); aw.onprocessorerror = e => { console.error(e); console.trace(); }; const msd = new MediaStreamAudioDestinationNode(ac); const { stream } = msd; const [track] = stream.getAudioTracks(); aw.connect(msd); const recorder = new MediaRecorder(stream); recorder.ondataavailable = e => console.log(URL.createObjectURL(e.data)); aw.port.onmessage = async e => { console.log(e.data); if ( e.data.started && ac.state === 'suspended' && recorder.state === 'inactive' ) { recorder.start(); await ac.resume(); setTimeout(_ => { port.postMessage('stop'); controller.close(); }, 1000 * 60 * 1); } else if (recorder.state === 'recording') { recorder.stop(); track.stop(); aw.disconnect(); msd.disconnect(); await ac.close(); console.log(track); gc(); } }; aw.port.postMessage(readable, [readable]); } if (readable.locked) controller.enqueue(Uint8Array.from(Object.values(JSON.parse(message)))); } else { if (readable.locked) controller.close(); } }); port.postMessage('start');
A version using
WebAssembly.Memory.grow()
to dynamically grow memory
during the stream. The expected result is 30 minutes of audio being written to
SharedArrayBuffer
. An error is thrown atTesting at a legacy 4.15.0-20-lowlatency 32-bit kernel attempted to grow memory to 30 minutes of data, with maximum set to 1 hour of data
An error was thrown when calling grow(1) when current offset is 177143808before grow 177012736 after grow 177078272 before grow 177078272 after grow 177143808 before grow 177143808 e4056867-5209-4bea-8f76-a86106ab83af:75 RangeError: WebAssembly.Memory.grow(): Unable to grow instance memory. at Memory.grow (<anonymous>) at Object.write (e4056867-5209-4bea-8f76-a86106ab83af:26) appendBuffers @ e4056867-5209-4bea-8f76-a86106ab83af:75 async function (async) appendBuffers @ e4056867-5209-4bea-8f76-a86106ab83af:73 177111040 177139656 {ended: true, currentTime: 1004.0308390022676, currentFrame: 44277760, readOffset: 177139656, writeOffset: 177111040, …} MediaStreamTrack {kind: "audio", id: "e0958602-f0ea-4565-9e34-f624afce7c12", label: "MediaStreamAudioDestinationNode", enabled: true, muted: false, …} blob:https://developer.mozilla.org/f9d716ff-c65e-4ddf-b7c7-5e385d0602ec closed
resulting in 16 minutes and 43 seconds of data being written to memory, observable at recorded audio
Asked the question if 1GB is the maximum the underlying memory can grow at 32-bit architectures at https://bugs.chromium.org/p/v8/issues/detail?id=7881.
if (globalThis.gc) gc(); var id = 'lmkllhfhgnmbcmhdagfhejkpicehnida'; var port = chrome.runtime.connect(id); var controller; var readable = new ReadableStream({ start(c) { return (controller = c); }, }); var init = false; port.onMessage.addListener(async message => { if (message !== 'done') { if (!init) { init = true; class AudioWorkletProcessor {} class AudioWorkletNativeMessageStream extends AudioWorkletProcessor { constructor(options) { super(options); this.initial = (384 * 512) / 65536; // 1 minute this.maximum = (384 * 512 * 60 * 60) / 65536; // 1 hour this.byteSize = this.maximum * 65536; this.memory = new WebAssembly.Memory({ initial: this.initial, maximum: this.maximum, shared: true, }); Object.assign(this, options.processorOptions); this.port.onmessage = this.appendBuffers.bind(this); } async appendBuffers({ data: readable }) { Object.assign(this, { readable }); const source = { write: (value, controller) => { if ( this.totalBytes + value.byteLength > this.memory.buffer.byteLength && this.totalBytes + value.buffer.byteLength < this.byteSize ) { console.log('before grow', this.memory.buffer.byteLength); this.memory.grow(1); console.log('after grow', this.memory.buffer.byteLength); } const uint8_sab = new Uint8Array(this.memory.buffer, this.readOffset); // console.log(this.totalBytes, this.totalBytes % 65536, this.totalBytes / 65536, this.memory.buffer.byteLength); if (this.totalBytes + value.buffer.byteLength < this.byteSize) { for ( let i = 0; i < value.buffer.byteLength; i++, this.readOffset++ ) { uint8_sab[this.readOffset] = value[i]; } this.totalBytes = this.readOffset; } else { const lastBytes = value.subarray(0, this.byteSize - this.totalBytes); // const uint8 = new Uint8Array(this.memory.buffer, this.readOffset); for ( let i = 0; i < lastBytes.buffer.byteLength; i++, this.readOffset++ ) { uint8_sab[this.readOffset] = value[i]; } this.totalBytes = this.readOffset; console.log( value.buffer.byteLength, this.readOffset, this.totalBytes ); this.readable.cancel(); } // accumulate 250 milliseconds of data before resuming AudioContext if (this.totalBytes >= (512 * 384) / 4 && !this.started) { this.started = true; this.port.postMessage({ started: this.started }); } }, close() { console.log('stream closed'); }, }; try { await this.readable.pipeTo(new WritableStream(source)); } catch (e) { console.warn(e); console.log(this.writeOffset, this.totalBytes); this.endOfStream(); } } endOfStream() { this.port.postMessage({ ended: true, currentTime, currentFrame, readOffset: this.readOffset, writeOffset: this.writeOffset, totalBytes: this.totalBytes, }); } process(inputs, outputs) { const channels = outputs.flat(); if ( this.writeOffset >= this.totalBytes || this.totalBytes === this.byteSize ) { console.log(this); this.endOfStream(); return false; } const uint8 = new Uint8Array(512); const uint8_sab = new Uint8Array(this.memory.buffer, this.writeOffset); // .slice(this.writeOffset, this.writeOffset + 512) try { for (let i = 0; i < 512; i++, this.writeOffset++) { if (this.writeOffset === this.byteSize) { break; } uint8[i] = uint8_sab[this.writeOffset]; } const uint16 = new Uint16Array(uint8.buffer); // https://stackoverflow.com/a/35248852 for (let i = 0, j = 0, n = 1; i < uint16.length; i++) { const int = uint16[i]; // If the high bit is on, then it is a negative number, and actually counts backwards. const float = int >= 0x8000 ? -(0x10000 - int) / 0x8000 : int / 0x7fff; // interleave channels[(n = ++n % 2)][!n ? j++ : j - 1] = float; } // console.log(channels[0]); } catch (e) { console.error(e); } return true; } } // register processor in AudioWorkletGlobalScope function registerProcessor(name, processorCtor) { return `${processorCtor};\nregisterProcessor('${name}', ${processorCtor.name});`; } const worklet = URL.createObjectURL( new Blob( [ registerProcessor( 'audio-worklet-native-message-stream', AudioWorkletNativeMessageStream ), ], { type: 'text/javascript' } ) ); const ac = new AudioContext({ latencyHint: 1, sampleRate: 44100, numberOfChannels: 2, }); ac.onstatechange = e => console.log(ac.state); if (ac.state === 'running') { await ac.suspend(); } await ac.audioWorklet.addModule(worklet); const aw = new AudioWorkletNode( ac, 'audio-worklet-native-message-stream', { numberOfInputs: 1, numberOfOutputs: 2, channelCount: 2, processorOptions: { totalBytes: 0, readOffset: 0, writeOffset: 0, done: false, ended: false, started: false, }, } ); aw.onprocessorerror = e => { console.error(e); console.trace(); }; const msd = new MediaStreamAudioDestinationNode(ac); const { stream } = msd; const [track] = stream.getAudioTracks(); aw.connect(msd); const recorder = new MediaRecorder(stream); recorder.ondataavailable = e => console.log(URL.createObjectURL(e.data)); aw.port.onmessage = async e => { console.log(e.data); if ( e.data.started && ac.state === 'suspended' && recorder.state === 'inactive' ) { recorder.start(); await ac.resume(); setTimeout(_ => { port.postMessage('stop'); controller.close(); }, 1000 * 60 * 30); } else if (recorder.state === 'recording') { recorder.stop(); track.stop(); aw.disconnect(); msd.disconnect(); await ac.close(); console.log(track); gc(); } }; aw.port.postMessage(readable, [readable]); } if (readable.locked) controller.enqueue(Uint8Array.from(Object.values(JSON.parse(message)))); } else { if (readable.locked) controller.close(); } }); port.postMessage('start');
Ideally, in this case, we should be able to stream directly to
AudioWorkletProcessor.process()
instead of writing to memory, i.e., the gist of the proposal at Chromium: Use Streams API forSTDIN
andSTDOUT
, flushing memory as we proceed.One approach to reduce memory usage would be to write Opus audio to memory and decode to raw PCM as
Float32Array
values atoutputs
channels, which have not yet tried.
Describe the feature in more detail
Internally RawPCMStreamNode
(or simply StreamNode
or InputStreamNode
) is a TransformStream
that accepts a ReadableStream
(readable
) of raw PCM, transforms the raw data (the terms "interleaved" and "planar" are no longer included in Web Audio API v1) into Float32Array
(s) corresponding to the number of channels contained in the input stream and sets the Float32Array
(s) as outputs
in process()
.
Externally his will work in the same way as MediaElementSourceNode
<body>
<audio controls autoplay crossOrigin="anonymous"></audio>
<script>
(async() => {
const context = new AudioContext({latencyHint:1.0});
await context.suspend();
const mediaElement = document.querySelector("audio");
mediaElement.onloadedmetadata = async e => await context.resume();
const source = new MediaElementAudioSourceNode(context, {
mediaElement
});
await context.audioWorklet.addModule("bypass-processor.js");
const bypasser = new AudioWorkletNode(context, "bypass-processor");
source.connect(bypasser);
bypasser.connect(context.destination);
mediaElement.src = "https://ia800301.us.archive.org/10/items/DELTAnine2013-12-11.WAV/Deltanine121113Pt3Wav.wav";
})();
</script>
</body>
where when connected to an AudioWorkletNode
AudioWorkletProcessor.process()
effectively takes input from src
of HTMLMediaElement
and resulting audio pipeline and outputs Float32Array
s at outputs
.
This will provide an entry point for users to utilize AudioContext
for non-standard or non-implemented device capture and processing with Web Audio API.
WebCodecs does not solve this - unless that solution can be unequivocally proven here by evidence, not conjecture.
Related art:
Related issue: