feat: add initial implementation of MIDI player and related components

This commit is contained in:
2026-04-26 00:52:03 +08:00
parent 9009a7c5bc
commit 3752ef1f57
11 changed files with 735 additions and 0 deletions
View File
+47
View File
@@ -0,0 +1,47 @@
import asyncio
import time
from collections.abc import AsyncGenerator, Callable
from typing import Never
from mido import Message, MetaMessage, MidiFile, UnknownMetaMessage
class AsyncMidiFile(MidiFile):
async def play_async(
self, meta_messages: bool = False, now: Callable[[], float] = time.monotonic
) -> AsyncGenerator[Message | MetaMessage | UnknownMetaMessage, Never]:
"""Play back all tracks.
The generator will sleep between each message by
default. Messages are yielded with correct timing. The time
attribute is set to the number of seconds slept since the
previous message.
By default you will only get normal MIDI messages. Pass
`meta_messages=True` if you also want meta messages.
You will receive copies of the original messages, so you can
safely modify them without ruining the tracks.
By default the system clock is used for the timing of yielded
MIDI events. To use a different clock (e.g. to synchronize to
an audio stream), pass `now=time_fn` where `time_fn` is a zero
argument function that yields the current time in seconds.
"""
start_time = now()
input_time = 0.0
for msg in self:
input_time += msg.time
playback_time = now() - start_time
duration_to_next_event = input_time - playback_time
if duration_to_next_event > 0.001:
await asyncio.sleep(duration_to_next_event - 0.001)
# await asyncio.to_thread(time.sleep, duration_to_next_event)
if msg.is_meta and not meta_messages:
continue
else:
yield msg
+16
View File
@@ -0,0 +1,16 @@
import enum
class NoteNames12TET(enum.IntEnum):
C = enum.auto()
Db = enum.auto()
D = enum.auto()
Eb = enum.auto()
E = enum.auto()
F = enum.auto()
Gb = enum.auto()
G = enum.auto()
Ab = enum.auto()
A = enum.auto()
Bb = enum.auto()
B = enum.auto()
+10
View File
@@ -0,0 +1,10 @@
from typing import cast
from mido import Message
def translate_note_on_velocity_0(msg: Message) -> Message:
mdi = msg.dict()
if mdi["type"] == "note_on" and cast(int, getattr(msg, "velocity")) == 0:
return Message("note_off", channel=mdi["channel"], note=mdi["note"], velocity=64, time=msg.time)
return msg
+260
View File
@@ -0,0 +1,260 @@
import asyncio
from asyncio import Event
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar, cast, override
import mido
from mido.ports import BaseInput, BaseOutput
from textual import work
from textual.app import App, ComposeResult
from textual.binding import BindingType
from textual.containers import Horizontal, HorizontalScroll, Vertical
from textual.driver import Driver
from textual.types import CSSPathType
from textual.widgets import Footer, Header
from termidi.asyncmidi import AsyncMidiFile
from termidi.const import NoteNames12TET
from termidi.filters import translate_note_on_velocity_0
from termidi.ui.components import PianoKeyboard12Keys
from termidi.ui.fileselector import FileSelector
from termidi.ui.streamselector import StreamSelector
from termidi.ui.waterfall import WaterfallDisplay
if TYPE_CHECKING:
from mido.messages.specs import NoteMsgDict
def midi_note_to_keyname_octave(note: int, octave_offset: int = 0) -> tuple[NoteNames12TET, int]:
octave, semitone = divmod(note, 12)
return NoteNames12TET(semitone + 1), octave + octave_offset
class PlayerApp(App[None]):
TITLE: str | None = "MIDI Player"
DEFAULT_CSS: ClassVar[str] = """
#scroll {
height: 1fr;
}
#inner {
height: 100%;
width: auto;
}
WaterfallDisplay {
height: 1fr;
width: 100%;
}
#kbd {
height: 5;
width: auto;
PianoKeyboard12Keys {
height: 5;
}
}
"""
BINDINGS: ClassVar[list[BindingType]] = [
("q", "quit", "Quit"),
("i", "set_input", "Set MIDI input"),
("o", "set_output", "Set MIDI output"),
("f", "open_file", "Open MIDI file"),
(".", "stop_playing", "Stop playing"),
]
input_stream: BaseInput
output_stream: BaseOutput
_midi_play_interrupt: Event
_midi_play_free: Event
_midi_input_interrupt: Event
def __init__(
self,
driver_class: type[Driver] | None = None,
css_path: CSSPathType | None = None,
watch_css: bool = False,
ansi_color: bool = False,
) -> None:
super().__init__(driver_class, css_path, watch_css, ansi_color)
self.input_stream = mido.open_input()
self.output_stream = mido.open_output()
self._midi_play_interrupt = Event()
self._midi_play_free = Event()
self._midi_play_free.set()
self._midi_input_interrupt = Event()
@override
def compose(self) -> ComposeResult:
yield Header()
yield Footer()
wkwidth = 2
bkwidth = 2
with Vertical():
with HorizontalScroll(id="scroll"):
with Vertical(id="inner"):
yield WaterfallDisplay(wkwidth, bkwidth, id="waterfall")
with Horizontal(id="kbd"):
yield PianoKeyboard12Keys(wkwidth, bkwidth)
yield PianoKeyboard12Keys(wkwidth, bkwidth)
yield PianoKeyboard12Keys(wkwidth, bkwidth)
yield PianoKeyboard12Keys(wkwidth, bkwidth)
yield PianoKeyboard12Keys(wkwidth, bkwidth)
yield PianoKeyboard12Keys(wkwidth, bkwidth)
yield PianoKeyboard12Keys(wkwidth, bkwidth)
yield PianoKeyboard12Keys(wkwidth, bkwidth)
yield PianoKeyboard12Keys(wkwidth, bkwidth)
yield PianoKeyboard12Keys(wkwidth, bkwidth)
yield PianoKeyboard12Keys(wkwidth, bkwidth)
def on_mount(self) -> None:
_ = self.set_interval(1 / 45, self._waterfall_tick)
def _waterfall_tick(self) -> None:
_ = self.query_one(WaterfallDisplay).tick()
def reset_keys(self) -> None:
for kbd in self.query(PianoKeyboard12Keys):
kbd.key_c_on = 0
kbd.key_db_on = 0
kbd.key_d_on = 0
kbd.key_eb_on = 0
kbd.key_e_on = 0
kbd.key_f_on = 0
kbd.key_gb_on = 0
kbd.key_g_on = 0
kbd.key_ab_on = 0
kbd.key_a_on = 0
kbd.key_bb_on = 0
kbd.key_b_on = 0
def set_key_on(
self, key: NoteNames12TET, on: bool = True, octave: int = 4, midi_note: int = -1, velocity: int = 64
) -> None:
wf = self.query_one(WaterfallDisplay)
if midi_note >= 0:
if on:
wf.note_on(midi_note, velocity)
else:
wf.note_off(midi_note)
for idx, kbd in enumerate(self.query(PianoKeyboard12Keys)):
if idx != octave:
continue
match key:
case NoteNames12TET.C:
kbd.key_c_on += on if on else -1
case NoteNames12TET.Db:
kbd.key_db_on += on if on else -1
case NoteNames12TET.D:
kbd.key_d_on += on if on else -1
case NoteNames12TET.Eb:
kbd.key_eb_on += on if on else -1
case NoteNames12TET.E:
kbd.key_e_on += on if on else -1
case NoteNames12TET.F:
kbd.key_f_on += on if on else -1
case NoteNames12TET.Gb:
kbd.key_gb_on += on if on else -1
case NoteNames12TET.G:
kbd.key_g_on += on if on else -1
case NoteNames12TET.Ab:
kbd.key_ab_on += on if on else -1
case NoteNames12TET.A:
kbd.key_a_on += on if on else -1
case NoteNames12TET.Bb:
kbd.key_bb_on += on if on else -1
case NoteNames12TET.B:
kbd.key_b_on += on if on else -1
return
@work
async def play_file(self, file: Path) -> None:
self._midi_play_free.clear()
self._midi_play_interrupt.clear()
self.output_stream.reset()
self.reset_keys()
try:
async for m in AsyncMidiFile(str(file), clip=True).play_async():
if self._midi_play_interrupt.is_set():
break
if isinstance(m, mido.Message):
m = translate_note_on_velocity_0(m)
mdi = m.dict()
self.output_stream.send(m)
if mdi["type"] in {"note_on", "note_off"}:
nmdi = cast("NoteMsgDict", mdi)
note, octave = midi_note_to_keyname_octave(nmdi["note"], octave_offset=0)
is_on = mdi["type"] == "note_on" and nmdi["velocity"] != 0
self.set_key_on(note, is_on, octave=octave, midi_note=nmdi["note"], velocity=nmdi["velocity"])
except OSError as e:
self.notify(repr(e), title="Error occured while playing:", severity="error")
finally:
self.output_stream.reset()
self.reset_keys()
self.query_one(WaterfallDisplay).reset()
self._midi_play_free.set()
@work
async def receive_input(self) -> None:
while True:
msg = self.input_stream.poll()
if self._midi_input_interrupt.is_set():
break
if msg is None:
await asyncio.sleep(0.0005)
continue
self.output_stream.send(msg)
mdi = msg.dict()
if mdi["type"] in {"note_on", "note_off"}:
nmdi = cast("NoteMsgDict", mdi)
note, octave = midi_note_to_keyname_octave(nmdi["note"], octave_offset=0)
is_on = mdi["type"] == "note_on" and nmdi["velocity"] != 0
self.set_key_on(note, is_on, octave=octave, midi_note=nmdi["note"], velocity=nmdi["velocity"])
async def playback_stop(self):
self._midi_play_interrupt.set()
_ = await self._midi_play_free.wait()
async def action_set_input(self) -> None:
stream = StreamSelector()
stream.options = mido.get_input_names()
await self.push_screen(stream, self._input_selected)
async def _input_selected(self, input: str | None):
if input is None:
return
_old_stream = self.input_stream
self._midi_input_interrupt.set()
self.input_stream = mido.open_input(input)
_old_stream.close()
self._midi_input_interrupt.clear()
_ = self.receive_input()
async def action_set_output(self) -> None:
stream = StreamSelector()
stream.options = mido.get_output_names()
await self.push_screen(stream, self._output_selected)
async def _output_selected(self, output: str | None):
if output is None:
return
_old_stream = self.output_stream
self.output_stream = mido.open_output(output)
_old_stream.reset()
_old_stream.close()
async def action_open_file(self) -> None:
fileselector = FileSelector("~/")
await self.push_screen(fileselector, self._file_selected)
async def _file_selected(self, file: Path | None) -> None:
if file is None:
return
self._midi_play_interrupt.set()
_ = await self._midi_play_free.wait()
_ = self.play_file(file)
async def action_stop_playing(self) -> None:
await self.playback_stop()
def main():
app = PlayerApp()
app.run()
+133
View File
@@ -0,0 +1,133 @@
from collections.abc import Generator
from typing import Any, ClassVar, override
from rich.segment import Segment
from textual.geometry import Size
from textual.reactive import reactive
from textual.strip import Strip
from textual.widget import Widget
type KbState12Keys = tuple[int, int, int, int, int, int, int, int, int, int, int, int]
_KB_BLACKCOLOR_12_KEYS: KbState12Keys = False, True, False, True, False, False, True, False, True, False, True, False
class PianoKeyboard12Keys(Widget):
COMPONENT_CLASSES: ClassVar[set[str]] = {
"pianokb--white-key",
"pianokb--white-key-on",
"pianokb--black-key",
"pianokb--black-key-on",
}
DEFAULT_CSS: str = """
PianoKeyboard12Keys {
width: auto;
}
PianoKeyboard12Keys .pianokb--white-key {
color: auto;
background: white;
}
PianoKeyboard12Keys .pianokb--white-key-on {
color: auto;
background: darkgrey;
}
PianoKeyboard12Keys .pianokb--black-key {
color: auto;
background: black;
}
PianoKeyboard12Keys .pianokb--black-key-on {
color: auto;
background: grey;
}
"""
key_c_on: reactive[int] = reactive(0)
key_db_on: reactive[int] = reactive(0)
key_d_on: reactive[int] = reactive(0)
key_eb_on: reactive[int] = reactive(0)
key_e_on: reactive[int] = reactive(0)
key_f_on: reactive[int] = reactive(0)
key_gb_on: reactive[int] = reactive(0)
key_g_on: reactive[int] = reactive(0)
key_ab_on: reactive[int] = reactive(0)
key_a_on: reactive[int] = reactive(0)
key_bb_on: reactive[int] = reactive(0)
key_b_on: reactive[int] = reactive(0)
keyboard_height: reactive[int] = reactive(5)
whitekey_width: reactive[int] = reactive(1)
blackkey_width: reactive[int] = reactive(1)
blackkey_height: reactive[int] = reactive(3)
def __init__(
self,
whitekey_width: int | None = None,
blackkey_width: int | None = None,
blackkey_height: int | None = None,
*,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
markup: bool = True,
):
super().__init__(name=name, id=id, classes=classes, disabled=disabled, markup=markup)
if whitekey_width is not None:
self.whitekey_width = whitekey_width
if blackkey_width is not None:
self.blackkey_width = blackkey_width
if blackkey_height is not None:
self.blackkey_height = blackkey_height
@override
def get_content_width(self, container: Size, viewport: Size) -> int:
return self.whitekey_width * 7 + self.blackkey_width * 5
def _line(self, y: int, kbstate: KbState12Keys) -> Generator[Segment, Any, None]:
whitekey = self.get_component_rich_style("pianokb--white-key")
whitekey_on = self.get_component_rich_style("pianokb--white-key-on")
blackkey = self.get_component_rich_style("pianokb--black-key")
blackkey_on = self.get_component_rich_style("pianokb--black-key-on")
whitesep = y >= self.blackkey_height
whitesep_sym = "\N{BOX DRAWINGS LIGHT VERTICAL}"
seg_whitekey = Segment(" " * self.whitekey_width, whitekey), Segment(" " * self.whitekey_width, whitekey_on)
seg_blackkey = Segment(" " * self.blackkey_width, blackkey), Segment(" " * self.blackkey_width, blackkey_on)
seg_blackkey_whitesep = (
Segment(" " * (self.blackkey_width // 2), whitekey),
Segment(" " * (self.blackkey_width // 2), whitekey_on),
)
seg_blackkey_whitesep_bar = Segment(whitesep_sym, style=whitekey)
for on, black, left_on, right_on in zip(kbstate, _KB_BLACKCOLOR_12_KEYS, [0, *kbstate[:-1]], [*kbstate[1:], 0]):
if black:
if not whitesep:
yield seg_blackkey[not not on]
else:
yield seg_blackkey_whitesep[not not left_on]
if self.blackkey_width & 1:
yield seg_blackkey_whitesep_bar
yield seg_blackkey_whitesep[not not right_on]
else:
yield seg_whitekey[not not on]
@override
def render_line(self, y: int) -> Strip:
return Strip(
self._line(
y,
(
self.key_c_on,
self.key_db_on,
self.key_d_on,
self.key_eb_on,
self.key_e_on,
self.key_f_on,
self.key_gb_on,
self.key_g_on,
self.key_ab_on,
self.key_a_on,
self.key_bb_on,
self.key_b_on,
),
),
self.whitekey_width * 7 + self.blackkey_width * 5,
)
+56
View File
@@ -0,0 +1,56 @@
from collections.abc import Iterable
from pathlib import Path
from typing import ClassVar, override
from textual.app import ComposeResult
from textual.binding import BindingType
from textual.reactive import reactive
from textual.screen import ModalScreen
from textual.widgets import DirectoryTree
class FilteredDirectoryTree(DirectoryTree):
show_hidden: reactive[bool] = reactive(False)
suffixes: tuple[str, ...] = (".mid", ".smf")
@override
def filter_paths(self, paths: Iterable[Path]) -> Iterable[Path]:
for path in paths:
if not self.show_hidden and path.name.startswith("."):
continue
if path.is_file() and path.suffix not in self.suffixes:
continue
yield path
class FileSelector(ModalScreen[Path]):
DEFAULT_CSS: str = """
FileSelector {
align: center middle;
}
FilteredDirectoryTree {
min-width: 40;
max-width: 120;
max-height: 40;
}
"""
BINDINGS: ClassVar[list[BindingType]] = [("esc", "dismiss", "Cancel")]
show_hidden: reactive[bool] = reactive(False)
start: str | Path
def __init__(
self, start: str | Path = "./", name: str | None = None, id: str | None = None, classes: str | None = None
) -> None:
super().__init__(name, id, classes)
self.start = start
@override
def compose(self) -> ComposeResult:
yield FilteredDirectoryTree(self.start).data_bind(self.__class__.show_hidden)
def on_directory_tree_file_selected(self, event: FilteredDirectoryTree.FileSelected):
if event.path.is_file() and event.path.suffix in FilteredDirectoryTree.suffixes:
_ = self.dismiss(event.path)
+35
View File
@@ -0,0 +1,35 @@
from collections.abc import Sequence
from typing import ClassVar, override
from textual.app import ComposeResult
from textual.binding import BindingType
from textual.reactive import var
from textual.screen import ModalScreen
from textual.widgets import OptionList
class StreamSelector(ModalScreen[str]):
DEFAULT_CSS: str = """
StreamSelector {
align: center middle;
}
OptionList {
min-width: 40;
max-width: 80;
max-height: 40;
}
"""
options: var[Sequence[str]] = var(list)
BINDINGS: ClassVar[list[BindingType]] = [("esc", "dismiss", "Cancel")]
@override
def compose(self) -> ComposeResult:
yield OptionList()
def on_mount(self) -> None:
option_list = self.query_one(OptionList)
_ = option_list.clear_options().add_options(self.options)
def on_option_list_option_selected(self, event: OptionList.OptionSelected) -> None:
_ = self.dismiss(self.options[event.option_index])
+178
View File
@@ -0,0 +1,178 @@
"""Strip-based note waterfall widget for termidi."""
from collections.abc import Generator
from typing import Any, ClassVar, override
from rich.color import Color
from rich.segment import Segment
from rich.style import Style
from textual import work
from textual.geometry import Size
from textual.strip import Strip
from textual.widget import Widget
# Note index within an octave (0-indexed) → is black key
_IS_BLACK: tuple[bool, ...] = (False, True, False, True, False, False, True, False, True, False, True, False)
# Number of cols occupied by each note slot given key widths
# White keys → whitekey_width, black keys → blackkey_width
# This must match PianoKeyboard12Keys._line() column layout exactly.
def _note_col_offsets(whitekey_width: int, blackkey_width: int) -> tuple[int, ...]:
"""Return the starting column of each of the 12 notes within one octave."""
offsets: list[int] = []
col = 0
for is_black in _IS_BLACK:
offsets.append(col)
col += blackkey_width if is_black else whitekey_width
return tuple(offsets)
def _octave_width(whitekey_width: int, blackkey_width: int) -> int:
return whitekey_width * 7 + blackkey_width * 5
# midi note → (octave_idx, note_idx_in_octave)
def _note_to_octave_semitone(midi_note: int) -> tuple[int, int]:
octave, semitone = divmod(midi_note, 12)
return octave, semitone
# Velocity → RGB color: low velocity = dim blue, high = bright cyan/white
def _velocity_color(velocity: int) -> Color:
t = max(0, min(127, velocity)) / 127.0
r = int(80 + 175 * t)
g = int(100 + 155 * t)
b = int(200 + 55 * t)
return Color.from_rgb(r, g, b)
class WaterfallDisplay(Widget):
"""Scrolling note waterfall rendered with Strip/Segment, aligned to PianoKeyboard12Keys."""
COMPONENT_CLASSES: ClassVar[set[str]] = {
"waterfall--background",
}
DEFAULT_CSS: ClassVar[str] = """
WaterfallDisplay {
background: $surface-darken-2;
}
"""
def __init__(
self,
whitekey_width: int = 2,
blackkey_width: int = 2,
num_octaves: int = 11,
*,
name: str | None = None,
id: str | None = None,
classes: str | None = None,
disabled: bool = False,
) -> None:
super().__init__(name=name, id=id, classes=classes, disabled=disabled)
self._whitekey_width: int = whitekey_width
self._blackkey_width: int = blackkey_width
self._num_octaves: int = num_octaves
self._oct_width: int = _octave_width(whitekey_width, blackkey_width)
self._note_offsets: tuple[int, ...] = _note_col_offsets(whitekey_width, blackkey_width)
self._total_width: int = self._oct_width * num_octaves
# buffer: list of rows (bottom = index 0, top = last index)
# Each row is a dict mapping col_start → (width, Style)
self._rows: list[dict[int, tuple[int, Style]]] = []
# Currently held notes: midi_note → velocity (for painting while key is held)
self._held: dict[int, int] = {}
@override
def get_content_width(self, container: Size, viewport: Size) -> int:
return self._total_width
def _render_width(self) -> int:
w = self.size.width
return w if w > 0 else self._total_width
def _ensure_rows(self, height: int) -> None:
"""Grow the row buffer to match current widget height."""
while len(self._rows) < height:
self._rows.append({})
def _col_for_note(self, midi_note: int) -> int:
"""Column offset (within the full waterfall width) for a given MIDI note."""
octave_idx, semitone = _note_to_octave_semitone(midi_note)
if octave_idx >= self._num_octaves:
return -1
return octave_idx * self._oct_width + self._note_offsets[semitone]
def _note_width(self, midi_note: int) -> int:
_, semitone = _note_to_octave_semitone(midi_note)
return self._blackkey_width if _IS_BLACK[semitone] else self._whitekey_width
def note_on(self, midi_note: int, velocity: int) -> None:
"""Call when a note_on event is received."""
if velocity == 0:
self.note_off(midi_note)
return
self._held[midi_note] = velocity
def note_off(self, midi_note: int) -> None:
"""Call when a note_off event is received."""
_ = self._held.pop(midi_note, None)
def reset(self) -> None:
"""Clear all held notes and the buffer."""
self._held.clear()
self._rows.clear()
_ = self.refresh()
@work(thread=True)
def tick(self) -> None:
"""Advance the waterfall by one row. Call on a regular interval."""
height = self.size.height
if height <= 0:
return
self._ensure_rows(height)
# Build the new top row from currently held notes
new_row: dict[int, tuple[int, Style]] = {}
for midi_note, velocity in self._held.items():
col = self._col_for_note(midi_note)
if col < 0:
continue
w = self._note_width(midi_note)
color = _velocity_color(velocity)
new_row[col] = (w, Style(bgcolor=color))
# Shift buffer: discard oldest (index 0), append new row at end
if len(self._rows) >= height:
_ = self._rows.pop(0)
self._rows.append(new_row)
_ = self.refresh()
def _render_row(self, row: dict[int, tuple[int, Style]], width: int) -> Generator[Segment, Any, None]:
"""Yield Segments for one waterfall row."""
bg = Style.null()
pos = 0
for col in sorted(row):
if col >= width:
break
if col > pos:
yield Segment(" " * (col - pos), bg)
w, style = row[col]
w = min(w, width - col)
yield Segment(" " * w, style)
pos = col + w
if pos < width:
yield Segment(" " * (width - pos), bg)
@override
def render_line(self, y: int) -> Strip:
height = self.size.height
width = self._render_width()
self._ensure_rows(height)
# y=0 is top (oldest notes), y=height-1 is bottom (newest, adjacent to keyboard).
# _rows is oldest-first; offset so that empty space sits at the top when buffer
# hasn't filled yet.
row_idx = y - (height - len(self._rows))
if row_idx < 0 or row_idx >= len(self._rows):
return Strip([Segment(" " * width)], width)
row = self._rows[row_idx]
return Strip(list(self._render_row(row, width)), width)
View File
View File