Source code for pytheory.tuner

"""Real-time instrument tuner.

Listens to your microphone, tracks pitch with the same YIN algorithm
behind :func:`pytheory.audio.detect_pitch`, and tells you what note
you're playing and how many cents sharp or flat you are.

Terminal::

    $ pytheory tune
    $ pytheory tune --instrument guitar   # lock to the six strings

Browser / JavaScript::

    $ pytheory tune --serve
    # opens http://localhost:8123 — a strobe tuner page

The served page is a strobe tuner: a segmented disc that drifts
clockwise when you're sharp, counter-clockwise when you're flat,
and freezes when you're in tune — the same display logic as a
Peterson strobe, driven by the YIN pitch track.

The server speaks Server-Sent Events *and* WebSocket, so any web
page or JS app can tap the pitch stream directly — no client
library needed::

    // SSE
    const tuner = new EventSource("http://localhost:8123/stream");
    tuner.onmessage = (e) => {
        const { freq, note, octave, cents, in_tune } = JSON.parse(e.data);
    };

    // WebSocket
    const ws = new WebSocket("ws://localhost:8123/ws");
    ws.onmessage = (e) => { const reading = JSON.parse(e.data); };

CORS is wide open on the stream, so a page served from anywhere
(your dev server, a file:// page, CodePen) can connect.

With an instrument preset, readings lock to the nearest string —
the ``target`` field says which one — so "tune the D string" never
gets misread as "you're 80 cents flat of E"::

    tuner = Tuner(instrument="guitar")

With ``chords=True`` (CLI: ``pytheory tune --chords``) the tuner
also identifies what chord is sounding — strum and the page (and
the stream's ``chord`` field) names it::

    $ pytheory tune --serve --chords
"""

import json
import threading
import time

import numpy

from .audio import detect_pitch

SAMPLE_RATE = 44_100

_NOTE_NAMES = ['C', 'C#', 'D', 'D#', 'E', 'F',
               'F#', 'G', 'G#', 'A', 'A#', 'B']

#: Open-string targets for common instruments (low to high).
INSTRUMENT_STRINGS = {
    "guitar":   ["E2", "A2", "D3", "G3", "B3", "E4"],
    "bass":     ["E1", "A1", "D2", "G2"],
    "ukulele":  ["G4", "C4", "E4", "A4"],
    "violin":   ["G3", "D4", "A4", "E5"],
    "viola":    ["C3", "G3", "D4", "A4"],
    "cello":    ["C2", "G2", "D3", "A3"],
    "mandolin": ["G3", "D4", "A4", "E5"],
    "banjo":    ["D3", "G3", "B3", "D4", "G4"],
}


def _note_to_freq(name, reference_pitch=440.0):
    """Note name with octave ("E2", "C#4") → frequency in Hz."""
    midi = _NOTE_NAMES.index(name[:-1]) + (int(name[-1]) + 1) * 12
    return reference_pitch * 2.0 ** ((midi - 69) / 12.0)


[docs] def string_targets(instrument, reference_pitch=440.0): """The (name, frequency) tuning targets for an instrument preset. >>> string_targets("guitar")[0] ('E2', 82.4068...) """ names = INSTRUMENT_STRINGS[instrument] return [(n, _note_to_freq(n, reference_pitch)) for n in names]
[docs] def analyze_frame(frame, sample_rate=SAMPLE_RATE, *, reference_pitch=440.0, fmin=50.0, fmax=1500.0, targets=None): """Analyze one audio frame: what note is this, and how far off? Args: frame: Mono float array (≥ ~2048 samples for reliable results). sample_rate: Sample rate in Hz. reference_pitch: Concert pitch for A4 (default 440; pass 442 for orchestras that tune high, 432 for the adventurous). fmin/fmax: Pitch search range. targets: Optional list of (name, freq) tuning targets (e.g. from :func:`string_targets`). The reading then reports cents relative to the *nearest target* instead of the nearest chromatic note, with the matched name in ``target``. Returns: Dict with ``freq``, ``note``, ``octave``, ``cents`` (signed, + is sharp), and ``in_tune`` (within ±5 cents) — or ``None`` if the frame has no confident pitch. With targets, also ``target`` and ``target_freq``. """ frame = numpy.asarray(frame, dtype=numpy.float64) _, freqs, voiced = detect_pitch(frame, sample_rate, frame_size=min(len(frame), 4096), hop=len(frame), fmin=fmin, fmax=fmax) if not voiced.any(): return None freq = float(freqs[voiced][0]) if targets: name, target_freq = min( targets, key=lambda t: abs(numpy.log2(freq / t[1]))) cents = 1200.0 * float(numpy.log2(freq / target_freq)) return { "freq": round(freq, 2), "note": name[:-1], "octave": int(name[-1]), "cents": round(cents, 1), "in_tune": bool(abs(cents) < 5.0), "target": name, "target_freq": round(target_freq, 2), } midi_float = 69 + 12 * float(numpy.log2(freq / reference_pitch)) nearest = int(round(midi_float)) cents = (midi_float - nearest) * 100.0 return { "freq": round(freq, 2), "note": _NOTE_NAMES[nearest % 12], "octave": nearest // 12 - 1, "cents": round(cents, 1), "in_tune": bool(abs(cents) < 5.0), # plain bool — JSON-safe }
[docs] class Tuner: """Microphone-driven tuner — keeps the latest pitch reading. Example:: tuner = Tuner() tuner.start() while True: reading = tuner.reading # dict or None, updated live """ def __init__(self, *, reference_pitch=440.0, fmin=50.0, fmax=1500.0, device=None, sample_rate=SAMPLE_RATE, instrument=None, chords=False): self.reference_pitch = reference_pitch self.fmin = fmin self.fmax = fmax self.device = device self.sample_rate = sample_rate self.instrument = instrument self.targets = None if instrument: if instrument not in INSTRUMENT_STRINGS: raise ValueError( f"Unknown instrument {instrument!r} — choose from: " + ", ".join(sorted(INSTRUMENT_STRINGS))) self.targets = string_targets(instrument, reference_pitch) # Widen the search range around the strings (bass E1 is # 41 Hz, below the chromatic default) self.fmin = min(self.fmin, self.targets[0][1] * 0.7) self.fmax = min(self.fmax, self.targets[-1][1] * 2.5) self.chords = chords self.chord = None # latest chord dict (or None) self.reading = None # latest analysis dict (or None) # Chord ID needs ~1s of audio for a stable chromagram; the # pitch tracker only ever looks at the newest 4096 samples. self._buf_len = sample_rate if chords else 4096 self._buf = numpy.zeros(self._buf_len, dtype=numpy.float64) self._lock = threading.Lock() self._stream = None def _callback(self, indata, frames, time_info, status): mono = indata[:, 0].astype(numpy.float64) with self._lock: self._buf = numpy.concatenate([self._buf, mono])[-self._buf_len:]
[docs] def start(self): """Open the microphone and start analyzing.""" import sounddevice as sd self._stream = sd.InputStream( samplerate=self.sample_rate, channels=1, blocksize=1024, device=self.device, callback=self._callback) self._stream.start() self._analyzing = True t = threading.Thread(target=self._analyze_loop, daemon=True) t.start() return self
def _analyze_loop(self): from .audio import identify_chord tick = 0 while self._analyzing: with self._lock: frame = self._buf.copy() self.reading = analyze_frame( frame[-4096:], self.sample_rate, reference_pitch=self.reference_pitch, fmin=self.fmin, fmax=self.fmax, targets=self.targets) # Chord ID is heavier (1s chromagram) — run at 5 Hz if self.chords and tick % 4 == 0: self.chord = identify_chord(frame, self.sample_rate) tick += 1 time.sleep(1 / 20)
[docs] def stop(self): self._analyzing = False if self._stream: self._stream.stop() self._stream.close() self._stream = None
_TUNER_PAGE = """<!doctype html> <html><head><meta charset="utf-8"><title>PyTheory Tuner</title> <style> body { background:#111; color:#eee; font-family:-apple-system,system-ui,sans-serif; display:flex; flex-direction:column; align-items:center; justify-content:center; min-height:100vh; margin:0; } #note { font-size:6rem; font-weight:700; line-height:1; } #note small { font-size:2.2rem; color:#888; } #freq { color:#888; font-size:1.1rem; margin-top:.3rem; } #strobe { margin-top:1.2rem; } #strings { display:flex; gap:.5rem; margin-top:1rem; } #strings span { padding:.3rem .8rem; border-radius:6px; background:#222; color:#888; font-size:1.1rem; border:1px solid #333; } #strings span.hot { background:#2a2a2a; color:#eee; border-color:#e74c3c; } .ok #strings span.hot { border-color:#2ecc71; color:#2ecc71; } #meter { width:min(80vw,440px); height:8px; background:#333; border-radius:4px; margin-top:1.4rem; position:relative; } #meter::after { content:""; position:absolute; left:50%; top:-5px; width:2px; height:18px; background:#666; } #needle { position:absolute; top:-3px; width:6px; height:14px; border-radius:3px; background:#e74c3c; left:50%; transition:left .08s linear, background .08s; } #cents { margin-top:.8rem; font-size:1.2rem; color:#888; } #chordsym { min-height:3.6rem; font-size:2.6rem; font-weight:600; color:#f1c40f; } #chordsym small { font-size:1.2rem; color:#888; margin-left:.6rem; } .ok #needle { background:#2ecc71; } .ok #note { color:#2ecc71; } </style></head><body> <div id="chordsym"></div> <div id="note">&mdash;</div> <div id="freq"></div> <canvas id="strobe" width="340" height="340"></canvas> <div id="strings"></div> <div id="meter"><div id="needle"></div></div> <div id="cents"></div> <script> const CONFIG = __CONFIG__; // String buttons for instrument presets const stringsDiv = document.getElementById("strings"); if (CONFIG.strings) { for (const s of CONFIG.strings) { const el = document.createElement("span"); el.textContent = s; el.id = "str-" + s; stringsDiv.appendChild(el); } } let reading = null; const es = new EventSource("/stream"); es.onmessage = (e) => { const d = JSON.parse(e.data); if (CONFIG.chords) { const el = document.getElementById("chordsym"); if (d && d.chord) { el.innerHTML = d.chord + "<small>" + (d.chord_notes || []).join(" ") + "</small>"; } else { el.textContent = ""; } } reading = (d && d.note) ? d : null; if (!reading) { document.getElementById("note").innerHTML = "&mdash;"; document.getElementById("freq").textContent = ""; document.getElementById("cents").textContent = "listening\\u2026"; document.body.classList.remove("ok"); return; } document.getElementById("note").innerHTML = d.note + "<small>" + d.octave + "</small>"; document.getElementById("freq").textContent = d.freq.toFixed(1) + " Hz"; const clamped = Math.max(-50, Math.min(50, d.cents)); document.getElementById("needle").style.left = "calc(" + (50 + clamped) + "% - 3px)"; document.getElementById("cents").textContent = (d.cents > 0 ? "+" : "") + d.cents.toFixed(1) + " cents"; document.body.classList.toggle("ok", d.in_tune); if (CONFIG.strings) { for (const s of CONFIG.strings) document.getElementById("str-" + s) .classList.toggle("hot", d.target === s); } }; // Strobe disc: segment pattern drifts clockwise when sharp, // counter-clockwise when flat, freezes when in tune. The inner // ring moves at half rate for fine reading near zero. const canvas = document.getElementById("strobe"); const ctx = canvas.getContext("2d"); const CX = canvas.width / 2, CY = canvas.height / 2; const RINGS = [ { n: 24, r0: 118, r1: 162, mult: 1.0 }, { n: 12, r0: 68, r1: 112, mult: 0.5 }, { n: 6, r0: 22, r1: 62, mult: 0.25 }, ]; let angle = 0, lastT = performance.now(); function draw(now) { const dt = Math.min((now - lastT) / 1000, 0.1); lastT = now; const active = reading !== null; if (active) angle += reading.cents * dt * 0.06 * 2 * Math.PI; ctx.clearRect(0, 0, canvas.width, canvas.height); const inTune = active && reading.in_tune; ctx.fillStyle = !active ? "#2a2a2a" : (inTune ? "#2ecc71" : "#e74c3c"); for (const ring of RINGS) { const seg = 2 * Math.PI / ring.n; for (let i = 0; i < ring.n; i++) { const a0 = angle * ring.mult + i * seg; ctx.beginPath(); ctx.arc(CX, CY, ring.r1, a0, a0 + seg / 2); ctx.arc(CX, CY, ring.r0, a0 + seg / 2, a0, true); ctx.closePath(); ctx.fill(); } } requestAnimationFrame(draw); } requestAnimationFrame(draw); </script></body></html>""" def _ws_frame(payload): """Wrap bytes in a single server-to-client WebSocket text frame.""" n = len(payload) if n < 126: header = bytes([0x81, n]) elif n < 65536: header = bytes([0x81, 126]) + n.to_bytes(2, "big") else: header = bytes([0x81, 127]) + n.to_bytes(8, "big") return header + payload
[docs] def serve(tuner, port=8123, open_browser=True): """Serve the tuner over HTTP: a live strobe page at ``/``, a Server-Sent Events pitch stream at ``/stream`` (CORS: any origin), and the same stream over WebSocket at ``/ws``. Blocks until Ctrl-C. """ from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer config = { "instrument": tuner.instrument, "strings": ([name for name, _ in tuner.targets] if tuner.targets else None), "reference_pitch": tuner.reference_pitch, "chords": bool(getattr(tuner, "chords", False)), } page = _TUNER_PAGE.replace("__CONFIG__", json.dumps(config)).encode() def reading_payload(): r = tuner.reading if not getattr(tuner, "chords", False): return r d = dict(r) if r else {} c = tuner.chord d["chord"] = c["symbol"] if c else None if c: d["chord_notes"] = c["notes"] d["chord_confidence"] = c["confidence"] return d class Handler(BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" # WebSocket needs 1.1 def log_message(self, *args): pass def do_GET(self): if self.path == "/": self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(page))) self.end_headers() self.wfile.write(page) elif self.path == "/stream": self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "close") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() self.close_connection = True try: while True: payload = json.dumps(reading_payload()) self.wfile.write(f"data: {payload}\n\n".encode()) self.wfile.flush() time.sleep(1 / 15) except (BrokenPipeError, ConnectionResetError): pass elif self.path == "/ws": self._serve_websocket() else: self.send_response(404) self.send_header("Content-Length", "0") self.end_headers() def _serve_websocket(self): import base64 import hashlib key = self.headers.get("Sec-WebSocket-Key") if not key: self.send_response(400) self.send_header("Content-Length", "0") self.end_headers() return accept = base64.b64encode(hashlib.sha1( (key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11").encode() ).digest()).decode() self.send_response(101, "Switching Protocols") self.send_header("Upgrade", "websocket") self.send_header("Connection", "Upgrade") self.send_header("Sec-WebSocket-Accept", accept) self.end_headers() self.close_connection = True try: while True: payload = json.dumps(reading_payload()).encode() self.wfile.write(_ws_frame(payload)) self.wfile.flush() time.sleep(1 / 15) except (BrokenPipeError, ConnectionResetError, OSError): pass server = ThreadingHTTPServer(("", port), Handler) url = f"http://localhost:{port}" print(f" PyTheory Tuner — {url}") if tuner.instrument: strings = " ".join(name for name, _ in tuner.targets) print(f" Instrument: {tuner.instrument} ({strings})") if getattr(tuner, "chords", False): print(" Chords: strum and the page names the chord") print(f" JS stream: {url}/stream (Server-Sent Events)") print(f" WebSocket: ws://localhost:{port}/ws") print(" Ctrl-C to stop.") if open_browser: import webbrowser webbrowser.open(url) try: server.serve_forever() except KeyboardInterrupt: print("\n Stopped.") finally: server.shutdown()
[docs] def run_terminal(tuner): """ASCII needle tuner in the terminal. Blocks until Ctrl-C.""" width = 61 center = width // 2 try: while True: r = tuner.reading if r: pos = int(numpy.clip(r["cents"], -50, 50) / 50 * center) + center bar = ["-"] * width bar[center] = "|" marker = "\033[92m●\033[0m" if r["in_tune"] else "\033[91m●\033[0m" bar[pos] = marker label = (f"→ {r['target']}" if "target" in r else f"{r['note']}{r['octave']:<2}") line = (f" {label} " f"{''.join(bar)} {r['cents']:+6.1f}¢ " f"({r['freq']:7.2f} Hz)") else: line = " -- " + "-" * width + " listening…" if getattr(tuner, "chords", False) and tuner.chord: c = tuner.chord line += f" ♫ {c['symbol']} ({' '.join(c['notes'])})" print("\r" + line + " " * 16, end="", flush=True) time.sleep(1 / 20) except KeyboardInterrupt: print("\n Stopped.")