Skip to content
This repository was archived by the owner on Oct 7, 2021. It is now read-only.
This repository was archived by the owner on Oct 7, 2021. It is now read-only.

RawPCMStreamNode (StreamNode, or InputStreamNode) #97

Closed
@guest271314

Description

@guest271314

Describe the feature

const ac = new AudioContext();
const aw = new AudioWorkletNode(ac, 'processor');
const {body: readable} = await fetch('/path/to/raw_pcm_output');
const rawPCMStreamNode = new RawPCMStreamNode(ac, readable);
aw.connect(rawPCMStreamNode);

Is there a prototype?

Yes. Rather involved, illustrating some of the deficiencies of SharedArrayBuffer and even dynamic SharedArrayBuffer memory growth using WebAssembly.Memory.prototype.grow(), thus this feature request to simplify the process.

From wasmerio/wasmer-php#121 (comment)

Put together a minimal, verifiable, complete working example of streaming from fetch() to AudioWorkletProcessor via Native Messaging, including the observable restriction which lead to filing this issue, and those issues referenced above, specifically, RangeError being thrown, and limitations on the amount of memory that can be pre-allocated at an ArrayBuffer or SharedArrayBuffer.

Posting the code here as not infrequently crash the browser and, or the OS in the midst of ongoing experiments.

Flow chart

  • Create Native Messaging Chromium app (app)
  • At an arbitrary web page ynamically set permissions for the app using Native File System to re-write manifest.json in app/ folder by setting the property externally_connectable value
  • Connect to the backgound script, background.js, that passes messages between the Native Messaging host (in this case we use bash and execute PHP within the shell script) and the arbitrary web page
  • We do not use the Native Messaging protocol to pass STDOUT to background.js then to the web page, instead we begin PHP built-in server process, localhost, to handle the subsequent call to fetch() with AbortController set within object passed as send parameter where we read STDOUT as a ReadableStream, converting to and from JSON and Uint8Array
  • Create a MediaStreamAudioDestinationNode for the ability to record, perform other tasks with MediaStream, and MediaStreamTrack of live audio stream of system audio capture
  • Use Transferable Streams to post response.body (ReadableStream) to AudioWorkletProcessor, where we pipe the stream to WritableStream and write Uint8Arrays to a single Uint8Array pre-allocated to a set amount (344 * 512 * 60 * 60 throws cannot allocate memory, here)
  • Store minimal data before process() is started by resuming AudioContext
  • Pass message to background.js to kill PHP built-in server process
  • Close ReadableStreamDefaultController

Chromium Native Messaging extenstion.

app/

manifest.json

{
  // Extension ID: <id>
  "key": "<key>",
  "name": "native_messaging_stream",
  "version": "1.0",
  "manifest_version": 2,
  "description": "Capture system audio",
  "icons": {},
  "permissions": [
    "nativeMessaging", "*://localhost/"
  ],
  "app": {
    "background": {
      "scripts": [
        "background.js"
      ],
      "persistent": false
    }
  },

  "externally_connectable": {
    "matches": [
      "https://developer.mozilla.org/*",
      "http://localhost/*"
    ],
    "ids": [
      "lmkllhfhgnmbcmhdagfhejkpicehnida"
    ]
  },
  "author": "guest271314"
}

background.js

const id = 'native_messaging_stream';
let externalPort, controller, signal;

chrome.runtime.onConnectExternal.addListener(port => {
  console.log(port);
  externalPort = port;
  externalPort.onMessage.addListener(message => {
    if (message === 'start') {
      chrome.runtime.sendNativeMessage(id, {}, async _ => {
        console.log(_);
        if (chrome.runtime.lastError) {
          console.warn(chrome.runtime.lastError.message);
        }
        controller = new AbortController();
        signal = controller.signal;
        // wait until bash script completes, server starts
        for await (const _ of (async function* stream() {
          while (true) {
            try {
              if ((await fetch('http://localhost:8000', { method: 'HEAD' })).ok)
                break;
            } catch (e) {
              console.warn(e.message);
              yield;
            }
          }
        })());
        try {
          const response = await fetch('http://localhost:8000?start=true', {
            cache: 'no-store',
            mode: 'cors',
            method: 'get',
            signal
          });
          console.log(...response.headers);
          const readable = response.body;
          readable
            .pipeTo(
              new WritableStream({
                write: async value => {
                  // value is a Uint8Array, postMessage() here only supports cloning, not transfer
                  externalPort.postMessage(JSON.stringify(value));
                },
              })
            )
            .catch(err => {
              console.warn(err);
              externalPort.postMessage('done');
            });
        } catch (err) {
          console.error(err);
        }
      });
    }
    if (message === 'stop') {
      controller.abort();
      chrome.runtime.sendNativeMessage(id, {}, _ => {
        if (chrome.runtime.lastError) {
          console.warn(chrome.runtime.lastError.message);
        }
        console.log('everything should be done');
      });
    }
  });
});

set_externally_connectable.js

(async(set_externally_connectable = ["https://example.com/*"], unset_externally_connectable = true) => {
  const dir = await self.showDirectoryPicker();
  const status = await dir.requestPermission({writable: true});
  const fileHandle = await dir.getFileHandle("manifest.json", {create: false});
  const file = await fileHandle.getFile();
  const manifest_text = await file.text();
  const match_extension_id = /\/\/ Extension ID: \w{32}/;
  const [extension_id] = manifest_text.match(match_extension_id);
  let text = manifest_text.replace(match_extension_id, `"_": 0,`);
  const manifest_json = JSON.parse(text);
  manifest_json.externally_connectable.matches = unset_externally_connectable ? set_externally_connectable :
    [...manifest_json.externally_connectable.matches, ...set_externally_connectable];
  const writer = await fileHandle.createWritable({keepExistingData:false});
  await writer.write(JSON.stringify(manifest_json, null, 2).replace(/"_": 0,/, extension_id)); 
  return await writer.close();
})([`${location.origin}/*`]);

host/

native_messaging_stream.json ($ cp native_messaging_stream.json ~/.config/chromium/NativeMessagingHosts)

{
  "name": "native_messaging_file_stream",
  "description": "Capture system audio",
  "path": "/path/to/host/captureSystemAudio.sh",
  "type": "stdio",
  "allowed_origins": [
    "chrome-extension://<id>/"
  ],
  "author": "guest271314"
}

index.php

<?php 
  if($_SERVER['REQUEST_METHOD'] == 'HEAD') {
    header('Vary: Origin');
    header("Access-Control-Allow-Origin: chrome-extension://lmkllhfhgnmbcmhdagfhejkpicehnida");
    header("Access-Control-Allow-Methods: GET, OPTIONS, HEADERS");
    header("Access-Control-Allow-Headers: Content-Type, Access-Control-Allow-Headers");    
    header("X-Powered-By:");
    header("HTTP/1.1 200 OK");
    die();
  }
  if (isset($_GET["start"])) {
    header('Vary: Origin');
    header("Access-Control-Allow-Origin: chrome-extension://lmkllhfhgnmbcmhdagfhejkpicehnida");
    header("Access-Control-Allow-Methods: GET, OPTIONS, HEAD");
    header("Access-Control-Allow-Headers: Content-Type, Access-Control-Allow-Headers");    
    header("Content-Type: text/plain");
    header("X-Powered-By:");
    echo passthru("parec -v --raw -d alsa_output.pci-0000_00_1b.0.analog-stereo.monitor");
    exit();
  }

captureSystemAudio.sh

#!/bin/bash
sendMessage() {
    # https://stackoverflow.com/a/24777120
    message='{"message": "ok"}'
    # Calculate the byte size of the string.
    # NOTE: This assumes that byte length is identical to the string length!
    # Do not use multibyte (unicode) characters, escape them instead, e.g.
    # message='"Some unicode character:\u1234"'
    messagelen=${#message}
    # Convert to an integer in native byte order.
    # If you see an error message in Chrome's stdout with
    # "Native Messaging host tried sending a message that is ... bytes long.",
    # then just swap the order, i.e. messagelen1 <-> messagelen4 and
    # messagelen2 <-> messagelen3
    messagelen1=$(( ($messagelen      ) & 0xFF ))               
    messagelen2=$(( ($messagelen >>  8) & 0xFF ))               
    messagelen3=$(( ($messagelen >> 16) & 0xFF ))               
    messagelen4=$(( ($messagelen >> 24) & 0xFF ))               
    # Print the message byte length followed by the actual message.
    printf "$(printf '\\x%x\\x%x\\x%x\\x%x' \
        $messagelen1 $messagelpen2 $messagelen3 $messagelen4)%s" "$message"
}

captureSystemAudio() {
  if pgrep -f php > /dev/null; then
    killall -9 php & sendMessage  
  else
    php -S localhost:8000 -t /path/to/host/ & sendMessage
  fi
}
captureSystemAudio

at the arbitrary web page, in this case, MDN, where we previously set the matching URL pattern at externally_connectable for permission to communicate with the Chromium Native Messaging application

At web page

var id = 'lmkllhfhgnmbcmhdagfhejkpicehnida';
var port = chrome.runtime.connect(id);
var controller;
var readable = new ReadableStream({
  start(c) {
    return (controller = c);
  },
});
var init = false;
port.onMessage.addListener(async message => {
  if (message !== 'done') {
    if (!init) {
      init = true;
      class AudioWorkletProcessor {}
      class AudioWorkletNativeMessageStream extends AudioWorkletProcessor {
        constructor(options) {
          super(options);
          this.byteSize = 512 * 384 * 60 * 1; // 5 minutes of data
          this.memory = new Uint8Array(this.byteSize); // TODO: grow memory, dynamically
          Object.assign(this, options.processorOptions);
          this.port.onmessage = this.appendBuffers.bind(this);
        }
        async appendBuffers({ data: readable }) {
          Object.assign(this, { readable });
          const source = {
            write: (value, controller) => {
              console.log(globalThis.currentTime % 3);
              if (this.totalBytes + value.byteLength < this.byteSize) {
                this.memory.set(value, this.readOffset);
                this.readOffset = this.readOffset + value.buffer.byteLength;
                this.totalBytes = this.readOffset;
              } else {
                const last = value.subarray(0, this.byteSize - this.totalBytes);
                this.memory.set(last, this.readOffset);
                this.readOffset = this.readOffset + last.length;
                this.totalBytes = this.readOffset;
                console.log(
                  value.buffer.byteLength,
                  this.readOffset,
                  this.totalBytes
                );
                controller.close();

              }
              // await 250 milliseconds of audio data
              if (this.totalBytes >= (512 * 384) / 4 && !this.started) {
                this.started = true;
                this.port.postMessage({ started: this.started });
              }
            },
            close() {
              console.log('stopped');
            },
          };
          try {
            await this.readable.pipeTo(new WritableStream(source));
          } catch (e) {
            console.warn(e);
            console.log(this.writeOffset, this.totalBytes);
            this.endOfStream();
          }
        }
        endOfStream() {
          this.port.postMessage({
            ended: true,
            currentTime,
            currentFrame,
            readOffset: this.readOffset,
            writeOffset: this.writeOffet,
            totalBytes: this.totalBytes,
          });
        }
        process(inputs, outputs) {
          const channels = outputs.flat();
          if (
            this.writeOffset >= this.totalBytes ||
            this.totalBytes === this.byteSize
          ) {
            console.log(this);
            this.endOfStream();
            return false;
          }

          const uint8 = new Uint8Array(512);
          try {
            for (let i = 0; i < 512; i++, this.writeOffset++) {
              if (this.writeOffset === this.byteSize) {
                break;
              }
              uint8[i] = this.memory[this.writeOffset];
            }
            const uint16 = new Uint16Array(uint8.buffer);
            // https://stackoverflow.com/a/35248852
            for (let i = 0, j = 0, n = 1; i < uint16.length; i++) {
              const int = uint16[i];
              // If the high bit is on, then it is a negative number, and actually counts backwards.
              const float =
                int >= 0x8000 ? -(0x10000 - int) / 0x8000 : int / 0x7fff;
              // interleave
              channels[(n = ++n % 2)][!n ? j++ : j - 1] = float;
            }
            // console.log(channels[0]);
          } catch (e) {
            console.error(e);
          }

          return true;
        }
      }
      // register processor in AudioWorkletGlobalScope
      function registerProcessor(name, processorCtor) {
        return `${processorCtor};\nregisterProcessor('${name}', ${processorCtor.name});`;
      }
      const worklet = URL.createObjectURL(
        new Blob(
          [
            registerProcessor(
              'audio-worklet-native-message-stream',
              AudioWorkletNativeMessageStream
            ),
          ],
          { type: 'text/javascript' }
        )
      );
      const ac = new AudioContext({
        latencyHint: 1,
        sampleRate: 44100,
        numberOfChannels: 2,
      });
      ac.onstatechange = e => console.log(ac.state);
      if (ac.state === 'running') {
        await ac.suspend();
      }
      await ac.audioWorklet.addModule(worklet);
      const aw = new AudioWorkletNode(ac, 'audio-worklet-native-message-stream', {
        numberOfInputs: 1,
        numberOfOutputs: 2,
        channelCount: 2,
        processorOptions: {
          totalBytes: 0,
          readOffset: 0,
          writeOffset: 0,
          done: false,
          ended: false,
          started: false,
        },
      });

      aw.onprocessorerror = e => {
        console.error(e);
        console.trace();
      };

      const msd = new MediaStreamAudioDestinationNode(ac);
      const { stream } = msd;
      const [track] = stream.getAudioTracks();
      aw.connect(msd);
      const recorder = new MediaRecorder(stream);
      recorder.ondataavailable = e => console.log(URL.createObjectURL(e.data));
      aw.port.onmessage = async e => {
        console.log(e.data);
        if (
          e.data.started &&
          ac.state === 'suspended' &&
          recorder.state === 'inactive'
        ) {
          recorder.start();
          await ac.resume();
          setTimeout(_ => {
            port.postMessage('stop');
            controller.close();
          }, 1000 * 60 * 1);

        } else if (recorder.state === 'recording') {
          recorder.stop();
          track.stop();
          aw.disconnect();
          msd.disconnect();
          await ac.close();
          console.log(track);
          gc();
        }
      };

      aw.port.postMessage(readable, [readable]);

    }
    if (readable.locked)
      controller.enqueue(Uint8Array.from(Object.values(JSON.parse(message))));
  } else {
    if (readable.locked) controller.close();
  }
});

port.postMessage('start');

A version using WebAssembly.Memory.grow() to dynamically grow memory
during the stream. The expected result is 30 minutes of audio being written to
SharedArrayBuffer. An error is thrown at

Testing at a legacy 4.15.0-20-lowlatency 32-bit kernel attempted to grow memory to 30 minutes of data, with maximum set to 1 hour of data
An error was thrown when calling grow(1) when current offset is 177143808

before grow 177012736
after grow 177078272
before grow 177078272
after grow 177143808
before grow 177143808
e4056867-5209-4bea-8f76-a86106ab83af:75 RangeError: WebAssembly.Memory.grow(): Unable to grow instance memory.
    at Memory.grow (<anonymous>)
    at Object.write (e4056867-5209-4bea-8f76-a86106ab83af:26)
appendBuffers @ e4056867-5209-4bea-8f76-a86106ab83af:75
async function (async)
appendBuffers @ e4056867-5209-4bea-8f76-a86106ab83af:73
177111040 177139656
{ended: true, currentTime: 1004.0308390022676, currentFrame: 44277760, readOffset: 177139656, writeOffset: 177111040, …}
MediaStreamTrack {kind: "audio", id: "e0958602-f0ea-4565-9e34-f624afce7c12", label: "MediaStreamAudioDestinationNode", enabled: true, muted: false, …}
blob:https://developer.mozilla.org/f9d716ff-c65e-4ddf-b7c7-5e385d0602ec
closed

resulting in 16 minutes and 43 seconds of data being written to memory, observable at recorded audio

Screenshot_2020-08-23_21-16-22

Asked the question if 1GB is the maximum the underlying memory can grow at 32-bit architectures at https://bugs.chromium.org/p/v8/issues/detail?id=7881.

if (globalThis.gc) gc();
var id = 'lmkllhfhgnmbcmhdagfhejkpicehnida';
var port = chrome.runtime.connect(id);
var controller;
var readable = new ReadableStream({
  start(c) {
    return (controller = c);
  },
});
var init = false;
port.onMessage.addListener(async message => {
  if (message !== 'done') {
    if (!init) {
      init = true;
      class AudioWorkletProcessor {}
      class AudioWorkletNativeMessageStream extends AudioWorkletProcessor {
        constructor(options) {
          super(options);
          this.initial = (384 * 512) / 65536; // 1 minute
          this.maximum = (384 * 512 * 60 * 60) / 65536; // 1 hour
          this.byteSize = this.maximum * 65536;
          this.memory = new WebAssembly.Memory({
            initial: this.initial,
            maximum: this.maximum,
            shared: true,
          });
          Object.assign(this, options.processorOptions);
          this.port.onmessage = this.appendBuffers.bind(this);
        }
        async appendBuffers({ data: readable }) {
          Object.assign(this, { readable });
          
          const source = {
            write: (value, controller) => {
              if (
                this.totalBytes + value.byteLength >
                  this.memory.buffer.byteLength &&
                this.totalBytes + value.buffer.byteLength < this.byteSize
              ) {
                console.log('before grow', this.memory.buffer.byteLength);
                this.memory.grow(1);
                console.log('after grow', this.memory.buffer.byteLength);
              }

              const uint8_sab = new Uint8Array(this.memory.buffer, this.readOffset);
              // console.log(this.totalBytes, this.totalBytes % 65536, this.totalBytes / 65536, this.memory.buffer.byteLength);
              if (this.totalBytes + value.buffer.byteLength < this.byteSize) {
                

                for (
                  let i = 0;
                  i < value.buffer.byteLength;
                  i++, this.readOffset++
                ) {
                  uint8_sab[this.readOffset] = value[i];
                }

                this.totalBytes = this.readOffset;
              } else {
                const lastBytes = value.subarray(0, this.byteSize - this.totalBytes);
                // const uint8 = new Uint8Array(this.memory.buffer, this.readOffset);
                for (
                  let i = 0;
                  i < lastBytes.buffer.byteLength;
                  i++, this.readOffset++
                ) {
                  uint8_sab[this.readOffset] = value[i];
                }
                this.totalBytes = this.readOffset;
                console.log(
                  value.buffer.byteLength,
                  this.readOffset,
                  this.totalBytes
                );
                this.readable.cancel();
              }
              // accumulate 250 milliseconds of data before resuming AudioContext
              if (this.totalBytes >= (512 * 384) / 4 && !this.started) {
                this.started = true;
                this.port.postMessage({ started: this.started });
              }
            },
            close() {
              console.log('stream closed');
            },
          };
          try {
            await this.readable.pipeTo(new WritableStream(source));
          } catch (e) {
            console.warn(e);
            console.log(this.writeOffset, this.totalBytes);
            this.endOfStream();
          }
        }
        endOfStream() {
          this.port.postMessage({
            ended: true,
            currentTime,
            currentFrame,
            readOffset: this.readOffset,
            writeOffset: this.writeOffset,
            totalBytes: this.totalBytes,
          });
        }
        process(inputs, outputs) {
          const channels = outputs.flat();
          if (
            this.writeOffset >= this.totalBytes ||
            this.totalBytes === this.byteSize
          ) {
            console.log(this);
            this.endOfStream();
            return false;
          }

          const uint8 = new Uint8Array(512);
          const uint8_sab = new Uint8Array(this.memory.buffer, this.writeOffset); // .slice(this.writeOffset, this.writeOffset + 512)
          
          try {
            for (let i = 0; i < 512; i++, this.writeOffset++) {
              if (this.writeOffset === this.byteSize) {
                break;
              }
              uint8[i] = uint8_sab[this.writeOffset];
            }

            const uint16 = new Uint16Array(uint8.buffer);
            // https://stackoverflow.com/a/35248852
            for (let i = 0, j = 0, n = 1; i < uint16.length; i++) {
              const int = uint16[i];
              // If the high bit is on, then it is a negative number, and actually counts backwards.
              const float =
                int >= 0x8000 ? -(0x10000 - int) / 0x8000 : int / 0x7fff;
              // interleave
              channels[(n = ++n % 2)][!n ? j++ : j - 1] = float;
            }
            // console.log(channels[0]);
          } catch (e) {
            console.error(e);
          }

          return true;
        }
      }
      // register processor in AudioWorkletGlobalScope
      function registerProcessor(name, processorCtor) {
        return `${processorCtor};\nregisterProcessor('${name}', ${processorCtor.name});`;
      }
      const worklet = URL.createObjectURL(
        new Blob(
          [
            registerProcessor(
              'audio-worklet-native-message-stream',
              AudioWorkletNativeMessageStream
            ),
          ],
          { type: 'text/javascript' }
        )
      );
      const ac = new AudioContext({
        latencyHint: 1,
        sampleRate: 44100,
        numberOfChannels: 2,
      });
      ac.onstatechange = e => console.log(ac.state);
      if (ac.state === 'running') {
        await ac.suspend();
      }
      await ac.audioWorklet.addModule(worklet);
      const aw = new AudioWorkletNode(
        ac,
        'audio-worklet-native-message-stream',
        {
          numberOfInputs: 1,
          numberOfOutputs: 2,
          channelCount: 2,
          processorOptions: {
            totalBytes: 0,
            readOffset: 0,
            writeOffset: 0,
            done: false,
            ended: false,
            started: false,
          },
        }
      );

      aw.onprocessorerror = e => {
        console.error(e);
        console.trace();
      };

      const msd = new MediaStreamAudioDestinationNode(ac);
      const { stream } = msd;
      const [track] = stream.getAudioTracks();
      aw.connect(msd);
      const recorder = new MediaRecorder(stream);
      recorder.ondataavailable = e => console.log(URL.createObjectURL(e.data));
      aw.port.onmessage = async e => {
        console.log(e.data);
        if (
          e.data.started &&
          ac.state === 'suspended' &&
          recorder.state === 'inactive'
        ) {
          recorder.start();
          await ac.resume();
          setTimeout(_ => {
            port.postMessage('stop');
            controller.close();
          }, 1000 * 60 * 30);
        } else if (recorder.state === 'recording') {
          recorder.stop();
          track.stop();
          aw.disconnect();
          msd.disconnect();
          await ac.close();
          console.log(track);
          gc();
        }
      };

      aw.port.postMessage(readable, [readable]);
    }
    if (readable.locked)
      controller.enqueue(Uint8Array.from(Object.values(JSON.parse(message))));
  } else {
    if (readable.locked) controller.close();
  }
});

port.postMessage('start');

Ideally, in this case, we should be able to stream directly to AudioWorkletProcessor.process() instead of writing to memory, i.e., the gist of the proposal at Chromium: Use Streams API for STDIN and STDOUT, flushing memory as we proceed.

One approach to reduce memory usage would be to write Opus audio to memory and decode to raw PCM as Float32Array values at outputs channels, which have not yet tried.

Describe the feature in more detail

Internally RawPCMStreamNode (or simply StreamNode or InputStreamNode) is a TransformStream that accepts a ReadableStream (readable) of raw PCM, transforms the raw data (the terms "interleaved" and "planar" are no longer included in Web Audio API v1) into Float32Array(s) corresponding to the number of channels contained in the input stream and sets the Float32Array(s) as outputs in process().

Externally his will work in the same way as MediaElementSourceNode

<body>
  <audio controls autoplay crossOrigin="anonymous"></audio>
  <script>
    (async() => {
      const context = new AudioContext({latencyHint:1.0});
      await context.suspend();
      const mediaElement = document.querySelector("audio");
      mediaElement.onloadedmetadata = async e => await context.resume();
      const source = new MediaElementAudioSourceNode(context, {
        mediaElement
      });
      await context.audioWorklet.addModule("bypass-processor.js");
      const bypasser = new AudioWorkletNode(context, "bypass-processor");
      source.connect(bypasser);
      bypasser.connect(context.destination);
      mediaElement.src = "https://ia800301.us.archive.org/10/items/DELTAnine2013-12-11.WAV/Deltanine121113Pt3Wav.wav";
    })();
  </script>
</body>

where when connected to an AudioWorkletNode AudioWorkletProcessor.process() effectively takes input from src of HTMLMediaElement and resulting audio pipeline and outputs Float32Arrays at outputs.

This will provide an entry point for users to utilize AudioContext for non-standard or non-implemented device capture and processing with Web Audio API.

WebCodecs does not solve this - unless that solution can be unequivocally proven here by evidence, not conjecture.

Related art:

Related issue:

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureFeature request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions