""" A simple (and cheesy-sounding) sound-synthesis library for playing with some basic sounds. This library does not by default play any sounds using your system's speakers. It just generates lists of numbers between 0 and 1, which it can save as .wav files to be played by another program. If you have the `simpleaudio` package installed, however, you will be able to play sounds directly. This module defines constants named C0 though B9 which hold the frequency values of notes in scientific pitch notation, and it also defines constants P0 through P14 which hold three octaves worth of notes from a pentatonic scale where the middle octave starts with P5 == C4. """ __version__ = "0.5.0" import math, time, random, itertools, struct, wave # Import simpleaudio if it's available, but don't make a fuss if it isn't. try: import simpleaudio except Exception: simpleaudio = None DEBUG = False """ Controls whether debugging messages get printed or not. """ def _debug(*args, **kwargs): """ Debugging function; supply same arguments as for `print`. """ if DEBUG: print(*args, **kwargs) # else do nothing #-----------# # Constants # #-----------# SOUND_SPEED = 340.29 """ The speed of sound, in meters/second. Useful for computing reverb delays. """ SAMPLE_RATE = 44100 """ We do all of our processing at this sample rate. """ DT = 1 / SAMPLE_RATE """ From the sample rate we can compute the time delta between samples. """ SAMPLE_WIDTH = 2 """ Bytes per sample. """ DEFAULT_LIMITER_DELAY = 44100 // 100 # 1/100th of a second """ Default delay for the limiter, in frames. """ DEFAULT_LIMITER_RELAX_DURATION = 44100 // 4 # 1/4 of a second """ Default relax duration for the limiter, in frames. """ #----------------# # Tone Constants # #----------------# SEMITONE = 2**(1/12) PITCH_NAMES = ["C", "Db", "D", "Eb", "E", "F", "Gb", "G", "Ab", "A", "Bb", "B"] SHARP_NAMES = { "Db": "Cs", "Eb": "Ds", "Gb": "Fs", "Ab": "Gs", "Bb": "As" } PIANO_KEYS = [] # see: https://en.wikipedia.org/wiki/Scientific_pitch_notation BASE_FREQUENCY = 16.352 """ The frequency of C0, the lowest note in scientific pitch notation. """ # Set extra global variables based on pitch names: _freq = BASE_FREQUENCY for _octave in range(10): for _pitch in PITCH_NAMES: _rounded = round(_freq, 4) globals()[_pitch + str(_octave)] = _rounded PIANO_KEYS.append((_pitch + str(_octave), _rounded)) if _pitch in SHARP_NAMES: globals()[SHARP_NAMES[_pitch] + str(_octave)] = _rounded _freq *= SEMITONE # Pentatonic scale with a more limited octave range than the piano keys # See: https://en.wikipedia.org/wiki/Pentatonic_scale PENTATONIC_NOTES = [ ("P0", C3), ("P1", D3), ("P2", E3), ("P3", G3), ("P4", A3), ("P5", C4), ("P6", D4), ("P7", E4), ("P8", G4), ("P9", A4), ("P10", C5), ("P11", D5), ("P12", E5), ("P13", G5), ("P14", A5), ] for _name, _freq in PENTATONIC_NOTES: globals()[_name] = _freq #-----------------# # Tone management # #-----------------# def halfStepUp(pitch, nSteps=1): """ Returns the tone that's nSteps (default 1) half-step(s) above the given pitch (nSteps may be negative). """ if not isinstance(pitch, (int, float)): raise TypeError("Pitch must be a number.") return pitch * SEMITONE**nSteps def halfStepDown(pitch, nSteps=1): """ Returns the tone that's nSteps (default 1) half-step(s) below the given pitch. """ return halfStepUp(pitch, -nSteps) def pianoIndex(pitch): """ Finds the index within the piano keys of the pitch that's closest to the given pitch, or returns None if the pitch is too high or low. """ linear = math.log(pitch / C0) / math.log(SEMITONE) nearest = C0 * math.pow(SEMITONE, round(linear)) match = round(nearest, 4) for i, (name, freq) in enumerate(PIANO_KEYS): if abs(match - freq) < 0.01: return i # No matches found return None def pentatonicIndex(pitch): """ Finds the index within the pentatonic tones of the pitch that's closest to the given pitch. Returns one end of the scale if the given pitch is too high or too low. """ mindist = None best = None for i, (name, freq) in enumerate(PENTATONIC_NOTES): dist = abs(math.log(freq) - math.log(pitch)) if mindist is None or mindist > dist: mindist = dist best = i return best def stepUp(pitch, nSteps=1): """ Returns the tone that's nSteps (default 1) whole-step(s) above the given pitch, on an 8-note octave. Detects the nearest piano-key pitch and gives the tone in steps above (or below if nSteps is negative) that. If the pitch is out of range of the piano keys, simply returns a pitch two semitones higher per step. """ if not isinstance(pitch, (int, float)): raise TypeError("Pitch must be a number.") if not isinstance(nSteps, int): raise TypeError("nSteps must be an integer.") if nSteps == 0: return pitch index = pianoIndex(pitch) if ( index is None or index + 2 > len(PIANO_KEYS) - 1 or index - 2 < 0 ): index = None if index == None: return pitch * SEMITONE**(2*nSteps) elif nSteps >= 0: nextName, nextFreq = PIANO_KEYS[index + 1] if 'b' in nextName: # it's a black key, so we skip it nextFreq = PIANO_KEYS[index + 2][1] if nSteps == 1: return nextFreq else: return stepUp(nextFreq, nSteps - 1) else: nextName, nextFreq = PIANO_KEYS[index - 1] if 'b' in nextName: # it's a black key, so we skip it nextFreq = PIANO_KEYS[index - 2][1] if nSteps == -1: return nextFreq else: return stepUp(nextFreq, nSteps + 1) def stepDown(pitch, nSteps=1): """ Returns the tone that's nSteps (default 1) whole-step(s) below the given pitch, on an 8-note octave. Detects the nearest piano-key pitch and gives the tone one step below that. If the pitch is out of range of the piano keys, simply returns a pitch two semitones lower. """ return stepUp(pitch, -nSteps) def leapUp(pitch, nSteps=1): """ Works like stepUp, but for the pentatonic scale. Note that the pentatonic scale has much more limited range than the piano keys. When given a pitch that falls between notes on the pentatonic scale, this function finds the nearest pentatonic pitch and uses that as the basis for the pitch. When outside of the range of the pentatonic scale, this function returns a note that's four semitones above (or below, if nSteps is negative) the given pitch. """ if not isinstance(pitch, (int, float)): raise TypeError("Pitch must be a number.") if not isinstance(nSteps, int): raise TypeError("nSteps must be an integer.") if nSteps == 0: return pitch index = pentatonicIndex(pitch) default = pitch * SEMITONE**(4 * nSteps) if ( index == None or index + 1 > len(PENTATONIC_NOTES) - 1 or index - 1 < 0 or default < PENTATONIC_NOTES[0][1] or default > PENTATONIC_NOTES[-1][1] ): return default if nSteps == 1: return PENTATONIC_NOTES[index + 1][1] elif nSteps == -1: return PENTATONIC_NOTES[index - 1][1] elif nSteps > 1: return leapUp(PENTATONIC_NOTES[index + 1][1], nSteps - 1) else: # less than -1 return leapUp(PENTATONIC_NOTES[index - 1][1], nSteps + 1) def leapDown(pitch, nSteps=1): """ The opposite of leapUp. When outside of the range of the pentatonic scale, this function returns a note that's four semitones above the given pitch. """ return leapUp(pitch, -nSteps) #-----------------# # Basic waveforms # #-----------------# def silence(t): """ Zero-amplitude silence. """ return 0 def sine(t): """ A 1-hertz sine wave as a function of time in seconds. """ return math.sin(t * 2 * math.pi) def triangle(t): """ A 1-hertz triangle wave as a function of time in seconds. """ tt = t % 1.0 if tt < 0.25: return tt / 0.25 elif tt < 0.75: return 1 - 2 * ((tt - 0.25) / 0.5) else: return -1 + (tt - 0.75) / 0.25 def sawtooth(t): """ A 1-hertz sawtooth wave as a function of time in seconds. """ tt = t % 1.0 if tt < 0.5: return tt / 0.5 else: return -1 + (tt - 0.5) / 0.5 def square(t): """ A 1-hertz square wave as a function of time in seconds. """ tt = t % 1.0 if tt < 0.5: return 1 else: return -1 def whiteNoise(_): """ White noise. Pure (pseudo-)random samples from the entire domain. The parameter is ignored, but exists so that whiteNoise is a valid signal function. Note that this gives whiteNoise an infinitely fractal nature: it cannot be distorted or paced as the result will still be the original noise. """ return 1 - 2 * random.random() def lfsr(x): """ A tiny chaos engine: a linear-feedback shift register. See: https://en.wikipedia.org/wiki/Linear-feedback_shift_register """ lsb = x & 1 r = x >> 1 # pseudo-if r ^= lsb * 0xe800000000000000 # 64, 63, 61, 60 return r & ((1 << 64) - 1) # mask to 64 bits def prng(x): """ Repeated application of an lfsr to get pseudo-random values. Empirically, 12 rounds seems to be enough mixing to scramble all 64 bits when results are observed from sequential seeds. This is not by any means a high-quality prng. """ for i in range(12): x = lfsr(x * 37 + i * 31) return x def fval(pval): """ Converts a PRNG output into a floating-point number between 0 and 1. """ return pval / ((1 << 64) - 1) def ival(fval): """ Converts a floating-point value into an integer useable as a prng value (but there's no obvious relationship between the float that goes in and the int that comes out. """ b = struct.pack("d", fval) return struct.unpack("q", b)[0] BROWN_NOISE_INTERVAL = 8*DT """ Rate at which we let the brown noise wander. """ def brownishNoise(t): """ Brown noise, via Brownian motion in the sample domain with steps every BROWN_NOISE_INTERVAL seconds, fixed to pass through a pseudo-random target value every second. See: https://en.wikipedia.org/wiki/Brownian_noise """ before = math.floor(t) after = math.ceil(t) anchor1 = 1 - 2 * fval(prng(before)) anchor2 = 1 - 2 * fval(prng(after)) tt = t % 1 stepsForward = int(tt / BROWN_NOISE_INTERVAL) stepsBackward = int((1 - tt) / BROWN_NOISE_INTERVAL) # TODO: This is untenably time-consuming! # Compute Brownian noise forward from the first anchor thread = ival(anchor2) sample1 = anchor1 for i in range(stepsForward): thread = prng(thread) r = fval(thread) sample1 += 0.1 * (1 - 2 * r) if sample1 > 1: sample1 = 1 - (sample1 - 1) elif sample1 < -1: sample1 = (-1 + (sample1 + 1)) # Compute Brownian noise backward from the second anchor thread = ival(anchor1) sample2 = anchor2 for i in range(stepsBackward): thread = prng(thread) r = fval(thread) sample2 += 0.1 * (1 - 2 * r) if sample2 > 1: sample2 = 1 - (sample2 - 1) elif sample2 < -1: sample2 = (-1 + (sample2 + 1)) # Interpolate between the sequence from either extreme so that we get # a smooth signal. return (1 - tt) * sample1 + tt * sample2 #--------------# # Equalization # #--------------# # TODO: Figure these out in equation format! #def lowpass(stream, cutoffFrequency): # """ # A simple infinite-impulse-response low-pass filter with the given # cutoff frequency. This function accepts an iterable and is a # generator for the filtered signal. # # Reference: https://en.wikipedia.org/wiki/Low-pass_filter # """ # radiansPerSample = 2 * math.pi * DT * cutoffFrequency # alpha = radiansPerSample / (radiansPerSample + 1) # prev = next(stream) # yield prev # for sample in stream: # sample = next(stream) # output = prev + alpha * (sample - prev) # prev = output # yield output # # #def highpass(stream, cutoffFrequency): # """ # A simple high-pass filter with the given cutoff frequency. This # function accepts an iterable and is a generator for the filtered # signal. # # Reference: https://en.wikipedia.org/wiki/High-pass_filter # """ # radiansPerSample = 2 * math.pi * DT * cutoffFrequency # alpha = 1 / (radiansPerSample + 1) # prevSamp = next(stream) # prevOut = prevSamp # yield prevOut # for sample in stream: # # TODO: The 2*alpha denominator normalizes, but it's not in the # # suggested equation? ... # output = (alpha * prevOut + alpha * (sample - prevSamp)) / 2*alpha # prevSamp = sample # prevOut = output # yield output # # #def equalize(stream, low, mid, high): # """ # A *very* rough frequency-domain equalizer that accepts floating-point # values between 0 and 1 for each of 3 bands: low from 0-100 hertz, mid # from 100-1000 hertz, and high above 1000 hertz. # # TODO: This is horribly broken! # """ # lowStream = lowpass(stream, 100) # midStream = lowpass(highpass(stream, 100), 1000) # highStream = highpass(stream, 1000) # while True: # mix = ( # low * next(lowStream) # + mid * next(midStream) # + high * next(highStream) # ) # yield min(1, max(-1, mix)) #-------------------# # Wave modification # #-------------------# def reverb(signal, gain, delay): """ Applies reverb at the given gain level with the given delay (in seconds). You can use SOUND_SPEED to compute delay for a given room size in meters: delay = roomSize / SOUND_SPEED This applies three jumps of reverb, gain should normally be less than 1 or things will get weird. """ def reverberant(t): """A reverberating signal.""" back3 = signal(t - delay*3) back2 = signal(t - delay*2) back1 = signal(t - delay*1) totalWeight = 1 + gain + gain**2 + gain**3 return ( signal(t) + back1 * gain + back2 * gain**2 + back3 * gain**3 ) / totalWeight return reverberant def distort(signal, distortion, distortionMagnitude): """ Distorts the given stream values using the given distortion signal. The distortion signal is scaled to the given distortion magnitude in seconds. """ def distorted(t): """A distorted signal.""" tt = t + distortion(t) * distortionMagnitude return signal(tt) return distorted def gain(signal, amount): """ Amplifies or attenuates the given signal by the given gain amount. """ def gained(t): """An amplified or attenuated signal.""" return signal(t) * amount return gained def pace(signal, tempo): """ Sets the speed of the given signal to the given tempo (in Hertz). Negative tempos result in a backwards signal. """ def paced(t): """A signal with a specified tempo.""" tt = t * tempo return signal(tt) return paced def shift(signal, delay): """ Shifts the signal in time by the given delay amount (in seconds). A negative delay can shift a signal backward in time. """ def shifted(t): """A signal shifted in time.""" return signal(t - delay) return shifted #------------------# # Wave Combination # #------------------# def modulate(*signals): """ Multiplies multiple signals. """ def modulated(t): """The product of several signals.""" result = 1 for signal in signals: result *= signal(t) return result modulated.__doc__ = ( "The product of several signals:\n" " {}\n" ).format( '\n '.join( signal.__doc__ for signal in signals ) ) return modulated def mix(*gainedSignals, normalize=False): """ Mixes multiple signals together. Each argument should be a tuple containing a floating point number for the gain of that signal, and a signal (a function of time in seconds), or it can be just a signal function, in which case the default gain of 1.0 is used. Each signal is multiplied by the associated gain before being added to the overall result, and if normalize is set to true, after adding the signals together the resulting signal will be divided by the sum of their gains. An example: mix( pace(sine, 440), (0.2, pace(sine, 880)) ) """ withGains = [ (1.0, pair) if not isinstance(pair, (list, tuple)) else pair for pair in gainedSignals ] def mixed(t): """The result of mixing several signals together.""" result = 0 combinedWeight = 0 for gainSignal in withGains: gain, signal = gainSignal combinedWeight += gain result += gain * signal(t) if normalize: return result / combinedWeight else: return result mixed.__doc__ = ( "A {}mix of several signals at different volume levels:\n" " {}\n" ).format( 'normalized ' if normalize else '', '\n '.join( '({:.2f}) {}'.format(gain, signal.__doc__) for (gain, signal) in withGains ) ) return mixed def crossfade(before, after, delay, fadeDuration, fadeShape=lambda x: x): """ Fades between two signals, from the before signal (full volume until the delay has elapsed) to the after signals (full volume after the delay + the fade duration). In between, the shape of the fade is controlled by the fadeShape function, which gets the fraction of the fade elapsed as an input (0-1) and should return the fraction of the after stream to include (0-1; should start at 0 and end at 1). """ def crossfaded(t): """A crossfade between two signals.""" if t < delay: return before(t) elif t < delay + fadeDuration: interp = (t - delay) / fadeDuration interp = fadeShape(interp) return (1 - interp) * before(t) + interp * after(t) else: return after(t) crossfaded.__doc__ = ( "A crossfade between two signals at {:.2f}s for {:.3f}s:\n" " The first signal is:\n {}\n" " The second signal is:\n {}\n" ).format( delay, fadeDuration, before.__doc__, after.__doc__ ) return crossfaded def splice(base, signal, when, duration): """ Splices the given signal onto the given base signal, cutting with no transition to the given signal at the given time, and after the given duration, cutting back to the base signal. Unless the sample values match up at the endpoints of the splice, this may create clicking sounds. The spliced-in signal's time will be shifted so that t=0 is aligned to the beginning of the splice. """ def spliced(t): """A base signal with a different signal spliced into it.""" if t < when: return base(t) elif t < when + duration: return signal(t - when) else: return base(t) spliced.__doc__ = ( "A spliced signal at {:.2f}s for {:.3f}s:\n" " The base signal is:\n {}\n" " The spliced signal is:\n {}\n" ).format( when, duration, base.__doc__, signal.__doc__ ) return spliced def stack(base, signal, when, duration): """ Stacks the given signal onto the given base signal, starting the stacked signal's t=0 at the given when value, and ending the stacking after the given duration. Stacking signals may cause out-of-range values, which will be handled during limiting. If the signal values at the endpoints of the stacked region aren't zero, this may create clicking sounds. """ def stacked(t): """A base signal with a different signal stacked into it.""" if t < when: return base(t) elif t < when + duration: return base(t) + signal(t - when) else: return base(t) stacked.__doc__ = ( "A stacked signal at {:.2f}s for {:.3f}s:\n" " The base signal is:\n {}\n" " The stacked signal is:\n {}\n" ).format( when, duration, base.__doc__, signal.__doc__ ) return stacked #-----------------------# # Finite-energy signals # #-----------------------# def attack(signal, duration, shape=lambda x: x): """ A version of the given signal with gain 0 at t=0, and 1 after the given duration, but in between it's the result of a linear gradient passed into the given shape function. """ def attacked(t): """A signal that starts silent and ramps up at t=0.""" if t <= 0: return 0 elif t < duration: tt = t / duration return shape(tt) * signal(t) else: return signal(t) attacked.__doc__ = ( "A signal which starts from silence and swells over {:.3f} seconds:\n" " {}\n" ).format( duration, signal.__doc__ ) return attacked def fade(signal, delay, duration, shape=lambda x: x): """ A version of the given signal with gain 1 at t<=delay, gain 0 after the given duration, and in between it's 1 - the result of a linear gradient passed into the given shape function. """ def faded(t): """A fade signal.""" if t <= delay: return signal(t) elif t < delay + duration: tt = (t - delay) / duration return (1 - shape(tt)) * signal(t) else: return 0 faded.__doc__ = ( "A signal which fades into silence over {:.3f}s at {:.2f}s:\n" " {}\n" ).format( duration, delay, signal.__doc__ ) return faded def note( waveform, duration=0.5, attackFraction = 0.07, fadeFraction = 0.5, attackShape = lambda x: x, fadeShape = lambda x: x ): """ Combines an infinite waveform with attack and fade shapes and a finite duration to produce a finite-energy note signal. The note always starts at t = 0. """ result = attack(waveform, duration * attackFraction, attackShape) result = fade( result, duration * (1 - fadeFraction), duration * fadeFraction, fadeShape ) return result #-------------# # Instruments # #-------------# def beep(hz, duration): """ Generates a relatively pure sine-wave-based note. """ waveform = mix( pace(triangle, hz), (0.2, pace(sine, hz*2)), (0.1, pace(sine, hz*4)), normalize=True ) return note(waveform, duration) def harmonica(hz, duration): """ Generates a sawtooth-based note with a odd overtones and a bit of reverb. """ waveform = mix( pace(sawtooth, hz), (0.2, pace(sawtooth, hz*3)), (0.1, pace(sawtooth, hz*5)), (0.05, pace(sawtooth, hz*7)), normalize=True ) waveform = reverb(waveform, 0.2, 0.17 / SOUND_SPEED) return note( waveform, duration, attackFraction=0.2, fadeFraction=0.6, attackShape=lambda x: x ** 2, fadeShape=lambda x: x ** 2 ) def keyboard(hz, duration): """ Generates a triangle-based note with even overtones. """ waveform = mix( pace(triangle, hz), (0.2, pace(triangle, hz*2)), (0.1, pace(triangle, hz*4)), (0.05, pace(triangle, hz*6)), normalize=True ) return note( waveform, duration, attackFraction=0.04, fadeFraction=0.8, attackShape=lambda x: x ** 0.5, fadeShape=lambda x: x ) def snare(duration): """ Generates wave-modulated white noise with an instant attack. """ waveform = mix( modulate(whiteNoise, pace(sine, 193)), (0.3, modulate(whiteNoise, pace(triangle, 182))), normalize=True ) return note( waveform, duration, attackFraction=0, fadeFraction=0.9, fadeShape=lambda x: x ** 0.5 # sqrt fade is faster than linear ) def kick(duration): """ Generates an impulse plus some low-frequency waves like a kick drum. """ pulseAttack = duration / 192 pulseFade = duration / 4 click = fade( pace(triangle, 1/(2*pulseAttack)), 0, pulseAttack, lambda x: 0 ) vibrate = mix( (0.7, pace(sine, 52)), (0.2, pace(sine, 52 * SEMITONE)), (0.1, pace(sine, 52 * SEMITONE**3)), normalize=True ) clickThenVibrate = crossfade( click, vibrate, pulseAttack*0.95, pulseAttack*0.3 ) return note( clickThenVibrate, duration, attackFraction=0, fadeFraction=0.3, fadeShape=lambda x: x ** 0.5 # sqrt fade is faster than linear ) #-----------------# # Data conversion # #-----------------# def stream(signal, start=0, end=None): """ Converts a signal to a stream of samples, starting at t=0, or the given start time. This function is an infinite generator for samples from the given stream, unless end= is provided, in which case the generator is exhausted when time reaches that point. """ t = start if end == None: while True: yield signal(t) t += DT else: while t < end: yield signal(t) t += DT # These variables correspond to the three limiter states (see limited_stream) LIMITER_STATE_UNCOMPRESSED = 0 LIMITER_STATE_COMPRESSING = 1 LIMITER_STATE_RELAXING = 2 def limited_stream(stream, threshold, release_delay, release_duration): """ Returns samples from the given stream with values limited to the [-threshold, threshold] range. The limiter remembers the peak value observed and compresses at the necessary ratio to clip that value ratio for the given release_delay number of frames afterwards, and after that the compression ratio scales linearly back down to 1 over the given release_duration. """ ratio = 1.0 frames_since_peak = 0 frames_since_relax = 0 peak_value = 0 peak_ratio = 0 state = LIMITER_STATE_UNCOMPRESSED for sample in stream: # process all of the (perhaps infinite) samples sabs = abs(sample) # Update our frame counters frames_since_peak += 1 frames_since_relax += 1 # Figure out whether the sample is in-range, in-compressed-range, # or out-of-range. if sabs <= threshold: # No compression needed, and no adjustments to make pass elif sabs * ratio <= threshold: # An exact re-peak (ratio nudging will leave it < threshold) if sabs == peak_value: frames_since_peak = 0 else: # out-of-range sample! frames_since_peak = 0 peak_value = sabs peak_ratio = round(threshold / peak_value, 8) - 1e-8 state = LIMITER_STATE_COMPRESSING # We nudge the ratio down ever-so-slightly to ensure that # floating point weirdness with multiplication still leaves # our result value strictly <= the threshold. ratio = peak_ratio # Yield one sample at the current ratio yield sample * ratio # Check for delay-based state transitions if ( state == LIMITER_STATE_COMPRESSING and frames_since_peak >= release_delay ): # Enter relaxation period after release_delay frames_since_relax = 0 state = LIMITER_STATE_RELAXING elif ( state == LIMITER_STATE_RELAXING and frames_since_relax >= release_duration ): # Return to uncompressed state after relax timeout ratio = 1.0 state = LIMITER_STATE_UNCOMPRESSED # Update compression ratio during relaxation period if state == LIMITER_STATE_RELAXING: progress = frames_since_relax / release_duration ratio = peak_ratio * (1 - progress) + progress # And now the loop continues... def bytestream(stream): """ Converts a stream into a byte stream with SAMPLE_WIDTH bytes per sample. Yields bytes objects of length equal to the sample width. """ limit = 2**(8*SAMPLE_WIDTH - 1) - 1 for sample in stream: quantized = int(sample * limit) yield quantized.to_bytes(SAMPLE_WIDTH, byteorder='little', signed=True) def bytestring(signal, duration, start=0): """ Turns a finite timespan of the given signal into a bytes object. Runs from t=start to t=(start + duration), where start defaults to 0. """ return b''.join( bytestream( limited_stream( stream(signal, start=start, end=start + duration), 1.0, DEFAULT_LIMITER_DELAY, DEFAULT_LIMITER_RELAX_DURATION, ) ) ) #--------# # Tracks # #--------# TRACKS = { "default": { "waveform": silence, "duration": 0, "log": [] } } """ The various active tracks. Each track name maps to a dictionary containing the waveform function for that track and a time value representing the current end-time of the track. The track dictionary also has a log that contains a list of tuples which logs each note, rest, or beat that gets added to the track using addNote, addRest, or addBeat. Each log entry is a triple recording the time at which the note/rest/beat begins, the duration of the note/rest/beat, and a string describing the note/rest/beat. When tracks are mixed, log entries are sorted by start time, then duration, and then description. """ ACTIVE_TRACK = "default" """ The currently-active track. By default, this is the track named 'default'. """ def setActiveTrack(name): """ Creates a new track with the given name and switches to that track, or switches to the existing track with that name if there is one. """ global TRACKS, ACTIVE_TRACK if name not in TRACKS: TRACKS[name] = { "waveform": silence, "duration": 0, "log": [] } ACTIVE_TRACK = name def eraseTrack(): """ Completely erases the current track, erasing any old contents and setting the duration back to zero. """ TRACKS[ACTIVE_TRACK] = { "waveform": silence, "duration": 0, "log": [] } def addRest(startAt, duration): """ Adds a period of silence to the currently active track. If the 'startAt' argument is the string 'end' instead of a number (in seconds), the rest will be added to the end of the track. In addition to potentially moving the track's end-time forward, this will add a rest entry to the track's log, but it doesn't actually change the track's signal, and if notes are added on top of the rest, or if the rest is added on top of the notes, those notes will still play. """ thisTrack = TRACKS[ACTIVE_TRACK] if startAt == "end": startAt = thisTrack['duration'] elif not isinstance(startAt, (int, float)): raise TypeError("The start time must be a number or the string 'end'.") if not isinstance(duration, (int, float)): raise TypeError("The duration must be a number.") # No need to alter the waveform, which is silent outside of active # notes in any case. thisTrack['log'].append( ( startAt, duration, "a {:0.3g}s rest".format(duration) ) ) restEnd = startAt + duration thisTrack['duration'] = max(thisTrack['duration'], restEnd) def addNote(startAt, instrument, pitch, duration): """ Adds a note at the given moment using the given instrument at the given pitch for the given duration to the currently active track. The first argument should be a number of seconds, but may also be the string "end" to start the note at the end of the current track. The second argument must be a function which accepts pitch and duration as parameters in that order, examples include `keyboard` and `harmonica`, or it may be the name of such a function in this module as a string. Note that pitchless instruments should be handled via `addBeat` instead. """ thisTrack = TRACKS[ACTIVE_TRACK] if startAt == "end": startAt = thisTrack['duration'] elif not isinstance(startAt, (int, float)): raise TypeError("The start time must be a number or the string 'end'.") if isinstance(instrument, str): name = instrument try: instrument = globals()[instrument] except Exception: raise TypeError("'{}' is not a valid instrument name.".format(name)) elif not isinstance(instrument, type(lambda: 0)): raise TypeError( ( "The instrument must be an instrument name string or an" + " instrument function (got: {})." ).format(repr(instrument)) ) if instrument.__code__.co_argcount < 2: raise TypeError( ( "The instrument function must accept pitch and duration" + " arguments (function '{}' only accepts {} positional" + " arguments)." ).format(instrument.__name__, instrument.__code__.co_argcount) ) if not isinstance(pitch, (int, float)): raise TypeError( "The pitch must be a number (you may use the numerical" + " constants included in this module like P1 and C4)." ) if not isinstance(duration, (int, float)): raise TypeError("The duration must be a number.") signal = instrument(pitch, duration) wf = thisTrack['waveform'] thisTrack['waveform'] = stack(wf, signal, startAt, duration) thisTrack['log'].append( ( startAt, duration, "a {:0.3g}s {} note at {:.3g} Hz".format( duration, instrument.__name__, pitch ) ) ) noteEnd = startAt + duration thisTrack['duration'] = max(thisTrack['duration'], noteEnd) def addBeat(startAt, instrument, duration): """ Adds a beat at the given time using the given instrument for the given duration to the currently active track. The first argument should be a number in seconds, or it may be the string 'end' in which case the beat will be added to the end of the current track. The second argument must be a function which accepts just duration as a parameter, examples include `snare` and `kick`, or it may be the name of such a function in this module as a string. Note that pitched instruments should be handled via `addNote` instead. """ thisTrack = TRACKS[ACTIVE_TRACK] if startAt == "end": startAt = thisTrack['duration'] elif not isinstance(startAt, (int, float)): raise TypeError("The start time must be a number or the string 'end'.") if isinstance(instrument, str): name = instrument try: instrument = globals()[instrument] except Exception: raise TypeError("'{}' is not a valid instrument name.".format(name)) elif not isinstance(instrument, type(lambda: 0)): raise TypeError( ( "The instrument must be an instrument name string or an" + " instrument function (got: {})." ).format(repr(instrument)) ) if instrument.__code__.co_argcount < 1: raise TypeError( ( "The instrument function must accept a duration" + " argument (function '{}' doesn't accept any positional" + " arguments)." ).format(instrument.__name__) ) if not isinstance(duration, (int, float)): raise TypeError("The duration must be a number.") signal = instrument(duration) wf = thisTrack['waveform'] thisTrack['waveform'] = stack(wf, signal, startAt, duration) thisTrack['log'].append( ( startAt, duration, "a {:0.3g}s {} beat".format(duration, instrument.__name__) ) ) beatEnd = startAt + duration thisTrack['duration'] = max(thisTrack['duration'], beatEnd) def trackDuration(): """ Returns the duration of the current track, in seconds. """ track = TRACKS[ACTIVE_TRACK] return track['duration'] def mixTracks(track1, track2, newName): """ Creates a new track with the given new name, and mixes the two provided tracks into the new track. Note that this may cause out-of-range values to occur which will be handled during limiting. The name for the new track must not already be used by an existing track. """ if newName in TRACKS: raise ValueError( "Track '{}' already exists: cannot mix tracks.".format(newName) ) if track1 not in TRACKS: raise ValueError("Can't mix tracks: no track name '{}'.".format(track1)) if track2 not in TRACKS: raise ValueError("Can't mix tracks: no track name '{}'.".format(track2)) track1 = TRACKS[track1] track2 = TRACKS[track2] wf1 = track1['waveform'] wf2 = track2['waveform'] TRACKS[newName] = { "waveform": mix(wf1, wf2), "duration": max(track1['duration'], track2['duration']), "log": sorted(track1['log'] + track2['log']) } def renderTrack(track): """ Renders the given track, adding a 'rendered' key that contains a tuple of a bytestring with the track data, and a duration value indicating how long the render is. If there's an existing render, only the part of the track after the end of that will be rendered, and this will be added onto what is already there. Accordingly, functions that edit existing parts of a track should remove the track's 'rendered' entry. This function will construct a WaveObject for the track as well if the simpleaudio module is available. """ _debug("Rendering track...") if 'rendered' in track: _debug(" ...incrementally...") partial, finishedUntil = track['rendered'] else: _debug(" ...from scratch...") partial = b'' finishedUntil = 0 trackEnd = track['duration'] if trackEnd == finishedUntil: _debug(" ...nothing new to render.") return # no work needs to be done; the existing render is up-to-date _debug(" ...rendering from {} to {}...".format(finishedUntil, trackEnd)) newData = bytestring( track['waveform'], trackEnd - finishedUntil, finishedUntil ) _debug(" ...compiled {} bytes...".format(len(newData))) data = partial + newData track['rendered'] = (data, trackEnd) _debug(" ...stored new render data.") # Pre-construct a WaveObject if simpleaudio is available. if simpleaudio is not None: _debug("Creating wave object...") _debug( ( "We have {} bytes of data ({} seconds of audio at {}" + " bytes/frame and {} frames/second)." ).format( len(data), round((len(data)/SAMPLE_WIDTH) / SAMPLE_RATE, 3), SAMPLE_WIDTH, SAMPLE_RATE ) ) if len(data) > 0: track['wave_object'] = simpleaudio.WaveObject( data, 1, SAMPLE_WIDTH, SAMPLE_RATE ) else: track['wave_object'] = None _debug(" ...done.") def saveTrack(filename): """ Saves the currently active track in .wav format in the given filename. Overwrites that file if it already exists! """ track = TRACKS[ACTIVE_TRACK] renderTrack(track) # make sure render is up-to-date rendered = track.get('rendered') if rendered == None: tdata = b'' else: tdata = rendered[0] with wave.open(filename, mode='wb') as fout: fout.setnchannels(1) fout.setsampwidth(SAMPLE_WIDTH) fout.setframerate(SAMPLE_RATE) fout.writeframes(tdata) class ZeroLengthTrack: """ Object for representing a 0-duration track. Supports same operations as a WaveObject.play() result. """ def wait_done(self): """ Returns immediately, because the track's duration is 0. """ return def printTrack(): """ Prints out the log of the current track. When notes overlap, prints their start times, but otherwise just prints each note. TODO: Watch out for gaps? """ thisTrack = TRACKS[ACTIVE_TRACK] skip = 0 log = thisTrack["log"] first = True lastSkip = False for i, entry in enumerate(log): if skip > 0: lastSkip = True skip -= 1 continue describe = [ entry ] start, duration, description = entry now = start until = start + duration offset = 1 while now < until and i + offset < len(log): nextEntry = log[i + offset] nextStart, nextDuration, nextDescription = nextEntry if nextStart < until: # an overlapping note until = max(until, nextStart + nextDuration) describe.append(nextEntry) now = nextStart + nextDuration offset += 1 if len(describe) == 1: if first: first = False print(describe[0][2]) elif lastSkip: lastSkip = False print("at {:.3g}s".format(describe[0][0]), describe[0][2]) else: print("and", describe[0][2]) else: first = False skip = len(describe) - 1 for start, duration, description in describe: if not description.endswith("rest"): print("at {:.3g}s".format(start), description) # overlapping rests are not described def prepareTrack(): """ Force the current track to be rendered, so that playback with playTrack can start immediately instead of needing to render the track which may take some time. """ renderTrack(TRACKS[ACTIVE_TRACK]) def playTrack(wait=True): """ Plays the currently active track. By default, this also waits until the track is done playing, but if wait is set to False, playback is asynchronous and starts as soon as the track is finished rendering. This function only works if the `simpleaudio` package is available, and raises a NotImplementedError if it isn't. When playing asynchronously, it returns a `WaveObject` instance, which has a `wait_done` method that can be used to wait until the playback is finished. When playing synchronously, it returns None. Note that there may be some processing delay before the track starts playing; call `prepareTrack` beforehand to get the processing out of the way earlier. """ if simpleaudio is None: raise NotImplementedError( "You must have the `simpleaudio` package installed in order" + " to play sounds.\n" + "You should be able to use your package manager (or maybe" + " `pip`) to install the `simpleaudio` package." ) track = TRACKS[ACTIVE_TRACK] renderTrack(track) # make sure render is up-to-date wo = track.get('wave_object') if wo == None: result = ZeroLengthTrack() else: result = wo.play() if wait: result.wait_done() return None else: return result #---------# # Testing # #---------# def tuneTest(): """ A test function that produces a drums-and-keyboard two-track tune, prints the log, and plays it. """ setActiveTrack("drums") for i in range(3): addBeat('end', snare, 0.1) addRest('end', 0.1) addBeat('end', snare, 0.1) addRest('end', 0.1) addBeat('end', kick, 0.2) addBeat('end', snare, 0.1) addRest('end', 0.1) for i in range(8): addBeat('end', snare, 0.1) for i in range(5): addBeat('end', snare, 0.1) addRest('end', 0.1) addRest('end', 0.2) addBeat('end', kick, 0.2) addBeat('end', snare, 0.1) addRest('end', 0.1) addRest('end', 0.2) addBeat('end', snare, 0.2) setActiveTrack("tones") addRest('end', 0.8 * 4) # wait for drums #for freq in (C4, D4, E4, G4, A4, C5, A4, G4, E4, D4, C4): for freq in (P4, P5, P6, P7, P8, P9, P8, P7, P6, P5, P4): #addNote('end', beep, freq, 0.2) addNote('end', keyboard, freq, 0.2) addRest('end', 0.2) #addNote('end', harmonica, freq, 0.2) mixTracks("tones", "drums", "mixed") setActiveTrack("mixed") printTrack() playTrack() def polyTest(): """ A simple test for polyphonic music in a single track, relying on the limiter to handle compression. """ t = 0 for lower, higher in ((P3, P4), (P3, P5), (P4, P6), (P6, P7), (P7, P8)): addNote(t, keyboard, lower, 0.3) addNote(t, keyboard, higher, 0.3) t += 0.2 printTrack() playTrack() def limitTest(): """ A test designed to exercise the limiter. """ t = 0 addNote(t, keyboard, P5, 0.5) addNote(t, keyboard, P5, 0.5) addNote(t, keyboard, P5, 0.5) t += 0.5 addRest(t, 0.25) t += 0.25 addNote(t, keyboard, P5, 0.5) addNote(t, keyboard, P5, 0.5) t += 0.5 addRest(t, 0.25) t += 0.25 addNote(t, keyboard, P5, 0.5) printTrack() playTrack() if __name__ == "__main__": #tuneTest() polyTest() #limitTest()