I have a USRobotics 56K Modem with USB connected to a linux system. I downloaded the sample python codes that can play and record audio through the modem on the following website USRobotics Sample Code. I have tested them and the codes work flawlessly. However, for some reason I can't seem to find an answer to my question on the internet. So, I am asking here. Is it possible to record audio while playing different audio wave file at the same time on USRobotics modem using the following python codes?
Playing Audio File:
import serial
import time
import threading
import atexit
import sys
import re
import wave
analog_modem = serial.Serial()
analog_modem.port = "/dev/ttyACM0"
analog_modem.baudrate = 57600 #9600
analog_modem.bytesize = serial.EIGHTBITS #number of bits per bytes
analog_modem.parity = serial.PARITY_NONE #set parity check: no parity
analog_modem.stopbits = serial.STOPBITS_ONE #number of stop bits
analog_modem.timeout = 3 #non-block read
analog_modem.xonxoff = False #disable software flow control
analog_modem.rtscts = False #disable hardware (RTS/CTS) flow control
analog_modem.dsrdtr = False #disable hardware (DSR/DTR) flow control
analog_modem.writeTimeout = 3 #timeout for write
# Used in global event listener
disable_modem_event_listener = True
RINGS_BEFORE_AUTO_ANSWER = 2
#=================================================================
# Initialize Modem
#=================================================================
def init_modem_settings():
# Opean Serial Port
try:
analog_modem.open()
except:
print "Error: Unable to open the Serial Port."
sys.exit()
# Initialize
try:
analog_modem.flushInput()
analog_modem.flushOutput()
# Test Modem connection, using basic AT command.
if not exec_AT_cmd("AT"):
print "Error: Unable to access the Modem"
# reset to factory default.
if not exec_AT_cmd("ATZ3"):
print "Error: Unable reset to factory default"
# Display result codes in verbose form
if not exec_AT_cmd("ATV1"):
print "Error: Unable set response in verbose form"
# Enable Command Echo Mode.
if not exec_AT_cmd("ATE1"):
print "Error: Failed to enable Command Echo Mode"
# Enable formatted caller report.
if not exec_AT_cmd("AT+VCID=1"):
print "Error: Failed to enable formatted caller report."
# Enable formatted caller report.
#if not exec_AT_cmd("AT+FCLASS=8"):
# print "Error: Failed to enable formatted caller report."
analog_modem.flushInput()
analog_modem.flushOutput()
except:
print "Error: unable to Initialize the Modem"
sys.exit()
#=================================================================
#=================================================================
# Execute AT Commands on the Modem
#=================================================================
def exec_AT_cmd(modem_AT_cmd):
try:
global disable_modem_event_listener
disable_modem_event_listener = True
cmd = modem_AT_cmd + "\r"
analog_modem.write(cmd.encode())
modem_response = analog_modem.readline()
modem_response = modem_response + analog_modem.readline()
print modem_response
disable_modem_event_listener = False
if ((modem_AT_cmd == "AT+VTX") or (modem_AT_cmd == "AT+VRX")) and ("CONNECT" in modem_response):
# modem in TAD mode
return True
elif "OK" in modem_response:
# Successful command execution
return True
else:
# Failed command execution
return False
except:
disable_modem_event_listener = False
print "Error: unable to write AT command to the modem..."
return()
#=================================================================
#=================================================================
# Recover Serial Port
#=================================================================
def recover_from_error():
try:
exec_AT_cmd("ATH")
except:
pass
analog_modem.close()
init_modem_settings()
try:
analog_modem.close()
except:
pass
try:
init_modem_settings()
except:
pass
try:
exec_AT_cmd("ATH")
except:
pass
#=================================================================
#=================================================================
# Play wav file
#=================================================================
def play_audio():
print "Play Audio Msg - Start"
# Enter Voice Mode
if not exec_AT_cmd("AT+FCLASS=8"):
print "Error: Failed to put modem into voice mode."
return
# Compression Method and Sampling Rate Specifications
# Compression Method: 8-bit linear / Sampling Rate: 8000MHz
if not exec_AT_cmd("AT+VSM=128,8000"):
print "Error: Failed to set compression method and sampling rate specifications."
return
# Put modem into TAD Mode
if not exec_AT_cmd("AT+VLS=1"):
print "Error: Unable put modem into TAD mode."
return
# Put modem into TAD Mode
if not exec_AT_cmd("AT+VTX"):
print "Error: Unable put modem into TAD mode."
return
time.sleep(1)
# Play Audio File
global disable_modem_event_listener
disable_modem_event_listener = True
wf = wave.open('sample.wav','rb')
chunk = 1024
data = wf.readframes(chunk)
while data != '':
analog_modem.write(data)
data = wf.readframes(chunk)
# You may need to change this sleep interval to smooth-out the audio
time.sleep(.12)
wf.close()
#analog_modem.flushInput()
#analog_modem.flushOutput()
cmd = "<DLE><ETX>" + "\r"
analog_modem.write(cmd.encode())
# 2 Min Time Out
timeout = time.time() + 60*2
while 1:
modem_data = analog_modem.readline()
if "OK" in modem_data:
break
if time.time() > timeout:
break
disable_modem_event_listener = False
cmd = "ATH" + "\r"
analog_modem.write(cmd.encode())
print "Play Audio Msg - END"
return
#=================================================================
#=================================================================
# Modem Data Listener
#=================================================================
def read_data():
global disable_modem_event_listener
ring_data = ""
while 1:
if not disable_modem_event_listener:
modem_data = analog_modem.readline()
if modem_data != "":
print modem_data
if "b" in modem_data.strip(chr(16)):
print "b in modem data"
print "b count:"
print ((modem_data.strip(chr(16))).count("b"))
print "total length:"
print len(modem_data.strip(chr(16)))
print modem_data
if ((modem_data.strip(chr(16))).count("b")) == len(modem_data.strip(chr(16))):
print "all Bs in mode data"
#Terminate the call
if not exec_AT_cmd("ATH"):
print "Error: Busy Tone - Failed to terminate the call"
print "Trying to revoer the serial port"
recover_from_error()
else:
print "Busy Tone: Call Terminated"
if "s" == modem_data.strip(chr(16)):
#Terminate the call
if not exec_AT_cmd("ATH"):
print "Error: Silence - Failed to terminate the call"
print "Trying to revoer the serial port"
recover_from_error()
else:
print "Silence: Call Terminated"
if ("RING" in modem_data) or ("DATE" in modem_data) or ("TIME" in modem_data) or ("NMBR" in modem_data):
if "RING" in modem_data.strip(chr(16)):
ring_data = ring_data + modem_data
ring_count = ring_data.count("RING")
if ring_count == 1:
pass
print modem_data
elif ring_count == RINGS_BEFORE_AUTO_ANSWER:
ring_data = ""
play_audio()
#=================================================================
#=================================================================
# Close the Serial Port
#=================================================================
def close_modem_port():
try:
exec_AT_cmd("ATH")
except:
pass
try:
if analog_modem.isOpen():
analog_modem.close()
print ("Serial Port closed...")
except:
print "Error: Unable to close the Serial Port."
sys.exit()
#=================================================================
init_modem_settings()
#Start a new thread to listen to modem data
data_listener_thread = threading.Thread(target=read_data)
data_listener_thread.start()
# Close the Modem Port when the program terminates
atexit.register(close_modem_port)
Recording Audio:
import serial
import time
import threading
import atexit
import sys
import re
import wave
from datetime import datetime
import os
import fcntl
import subprocess
RINGS_BEFORE_AUTO_ANSWER = 2 #Must be greater than 1
MODEM_RESPONSE_READ_TIMEOUT = 120 #Time in Seconds (Default 120 Seconds)
MODEM_NAME = 'U.S. Robotics' # Modem Manufacturer, For Ex: 'U.S. Robotics' if the 'lsusb' cmd output is similar to "Bus 001 Device 004: ID 0baf:0303 U.S. Robotics"
# Record Voice Mail Variables
REC_VM_MAX_DURATION = 120 # Time in Seconds
# Used in global event listener
disable_modem_event_listener = True
# Global Modem Object
analog_modem = serial.Serial()
audio_file_name = ''
#=================================================================
# Set COM Port settings
#=================================================================
def set_COM_port_settings(com_port):
analog_modem.port = com_port
analog_modem.baudrate = 57600 #9600 #115200
analog_modem.bytesize = serial.EIGHTBITS #number of bits per bytes
analog_modem.parity = serial.PARITY_NONE #set parity check: no parity
analog_modem.stopbits = serial.STOPBITS_ONE #number of stop bits
analog_modem.timeout = 3 #non-block read
analog_modem.xonxoff = False #disable software flow control
analog_modem.rtscts = False #disable hardware (RTS/CTS) flow control
analog_modem.dsrdtr = False #disable hardware (DSR/DTR) flow control
analog_modem.writeTimeout = 3 #timeout for write
#=================================================================
#=================================================================
# Initialize Modem
#=================================================================
def detect_COM_port():
# List all the Serial COM Ports on Raspberry Pi
proc = subprocess.Popen(['ls /dev/tty[A-Za-z]*'], shell=True, stdout=subprocess.PIPE)
com_ports = proc.communicate()[0]
com_ports_list = com_ports.split('\n')
# Find the right port associated with the Voice Modem
for com_port in com_ports_list:
if 'tty' in com_port:
#Try to open the COM Port and execute AT Command
try:
# Set the COM Port Settings
set_COM_port_settings(com_port)
analog_modem.open()
except:
print "Unable to open COM Port: " + com_port
pass
else:
#Try to put Modem in Voice Mode
if not exec_AT_cmd("AT+FCLASS=8", "OK"):
print "Error: Failed to put modem into voice mode."
if analog_modem.isOpen():
analog_modem.close()
else:
# Found the COM Port exit the loop
print "Modem COM Port is: " + com_port
analog_modem.flushInput()
analog_modem.flushOutput()
break
#=================================================================
#=================================================================
# Initialize Modem
#=================================================================
def init_modem_settings():
# Detect and Open the Modem Serial COM Port
try:
detect_COM_port()
except:
print "Error: Unable to open the Serial Port."
sys.exit()
# Initialize the Modem
try:
# Flush any existing input outout data from the buffers
analog_modem.flushInput()
analog_modem.flushOutput()
# Test Modem connection, using basic AT command.
if not exec_AT_cmd("AT"):
print "Error: Unable to access the Modem"
# reset to factory default.
if not exec_AT_cmd("ATZ3"):
print "Error: Unable reset to factory default"
# Display result codes in verbose form
if not exec_AT_cmd("ATV1"):
print "Error: Unable set response in verbose form"
# Enable Command Echo Mode.
if not exec_AT_cmd("ATE1"):
print "Error: Failed to enable Command Echo Mode"
# Enable formatted caller report.
if not exec_AT_cmd("AT+VCID=1"):
print "Error: Failed to enable formatted caller report."
# Flush any existing input outout data from the buffers
analog_modem.flushInput()
analog_modem.flushOutput()
except:
print "Error: unable to Initialize the Modem"
sys.exit()
#=================================================================
#=================================================================
# Reset Modem
#=================================================================
def reset_USB_Device():
# Close the COM Port if it's open
try:
if analog_modem.isOpen():
analog_modem.close()
except:
pass
# Equivalent of the _IO('U', 20) constant in the linux kernel.
USBDEVFS_RESET = ord('U') << (4*2) | 20
dev_path = ""
# Bases on 'lsusb' command, get the usb device path in the following format -
# /dev/bus/usb/<busnum>/<devnum>
proc = subprocess.Popen(['lsusb'], stdout=subprocess.PIPE)
out = proc.communicate()[0]
lines = out.split('\n')
for line in lines:
if MODEM_NAME in line:
parts = line.split()
bus = parts[1]
dev = parts[3][:3]
dev_path = '/dev/bus/usb/%s/%s' % (bus, dev)
# Reset the USB Device
fd = os.open(dev_path, os.O_WRONLY)
try:
fcntl.ioctl(fd, USBDEVFS_RESET, 0)
print "Modem reset successful"
finally:
os.close(fd)
# Re-initialize the Modem
init_modem_settings()
#=================================================================
#=================================================================
# Execute AT Commands at the Modem
#=================================================================
def exec_AT_cmd(modem_AT_cmd, expected_response="OK"):
global disable_modem_event_listener
disable_modem_event_listener = True
try:
# Send command to the Modem
analog_modem.write((modem_AT_cmd + "\r").encode())
# Read Modem response
execution_status = read_AT_cmd_response(expected_response)
disable_modem_event_listener = False
# Return command execution status
return execution_status
except:
disable_modem_event_listener = False
print "Error: Failed to execute the command"
return False
#=================================================================
#=================================================================
# Read AT Command Response from the Modem
#=================================================================
def read_AT_cmd_response(expected_response="OK"):
# Set the auto timeout interval
start_time = datetime.now()
try:
while 1:
# Read Modem Data on Serial Rx Pin
modem_response = analog_modem.readline()
print modem_response
# Recieved expected Response
if expected_response == modem_response.strip(' \t\n\r' + chr(16)):
return True
# Failed to execute the command successfully
elif "ERROR" in modem_response.strip(' \t\n\r' + chr(16)):
return False
# Timeout
elif (datetime.now()-start_time).seconds > MODEM_RESPONSE_READ_TIMEOUT:
return False
except:
print "Error in read_modem_response function..."
return False
#=================================================================
#=================================================================
# Recover Serial Port
#=================================================================
def recover_from_error():
# Stop Global Modem Event listener
global disable_modem_event_listener
disable_modem_event_listener = True
# Reset USB Device
reset_USB_Device()
# Start Global Modem Event listener
disable_modem_event_listener = False
#=================================================================
#=================================================================
# Read DTMF Digits
#=================================================================
def dtmf_digits(modem_data):
digits = ""
digit_list = re.findall('/(.+?)~', modem_data)
for d in digit_list:
digits= digits + d[0]
return digits
#=================================================================
#=================================================================
# Record wav file (Voice Msg/Mail)
#=================================================================
def record_audio():
print "Record Audio Msg - Start"
# Enter Voice Mode
if not exec_AT_cmd("AT+FCLASS=8","OK"):
print "Error: Failed to put modem into voice mode."
return
# Set speaker volume to normal
if not exec_AT_cmd("AT+VGT=128","OK"):
print "Error: Failed to set speaker volume to normal."
return
# Compression Method and Sampling Rate Specifications
# Compression Method: 8-bit linear / Sampling Rate: 8000MHz
if not exec_AT_cmd("AT+VSM=128,8000","OK"):
print "Error: Failed to set compression method and sampling rate specifications."
return
# Disables silence detection (Value: 0)
if not exec_AT_cmd("AT+VSD=128,0","OK"):
print "Error: Failed to disable silence detection."
return
# Put modem into TAD Mode
if not exec_AT_cmd("AT+VLS=1","OK"):
print "Error: Unable put modem into TAD mode."
return
# Enable silence detection.
# Select normal silence detection sensitivity
# and a silence detection interval of 5 s.
if not exec_AT_cmd("AT+VSD=128,50","OK"):
print "Error: Failed tp enable silence detection."
return
# Play beep.
if not exec_AT_cmd("AT+VTS=[933,900,100]","OK"):
print "Error: Failed to play 1.2 second beep."
#return
# Select voice receive mode
if not exec_AT_cmd("AT+VRX","CONNECT"):
print "Error: Unable put modem into voice receive mode."
return
# Record Audio File
global disable_modem_event_listener
disable_modem_event_listener = True
# Set the auto timeout interval
start_time = datetime.now()
CHUNK = 1024
audio_frames = []
while 1:
# Read audio data from the Modem
audio_data = analog_modem.read(CHUNK)
# Check if <DLE>b is in the stream
if ((chr(16)+chr(98)) in audio_data):
print "Busy Tone... Call will be disconnected."
break
# Check if <DLE>s is in the stream
if ((chr(16)+chr(115)) in audio_data):
print "Silence Detected... Call will be disconnected."
break
# Check if <DLE><ETX> is in the stream
if (("<DLE><ETX>").encode() in audio_data):
print "<DLE><ETX> Char Recieved... Call will be disconnected."
break
# Timeout
elif ((datetime.now()-start_time).seconds) > REC_VM_MAX_DURATION:
print "Timeout - Max recording limit reached."
break
# Add Audio Data to Audio Buffer
audio_frames.append(audio_data)
global audio_file_name
# Save the Audio into a .wav file
wf = wave.open(audio_file_name, 'wb')
wf.setnchannels(1)
wf.setsampwidth(1)
wf.setframerate(8000)
wf.writeframes(b''.join(audio_frames))
wf.close()
# Reset Audio File Name
audio_file_name = ''
# Send End of Voice Recieve state by passing "<DLE>!"
if not exec_AT_cmd((chr(16)+chr(33)),"OK"):
print "Error: Unable to signal end of voice receive state"
# Hangup the Call
if not exec_AT_cmd("ATH","OK"):
print "Error: Unable to hang-up the call"
# Enable global event listener
disable_modem_event_listener = False
print "Record Audio Msg - END"
return
#=================================================================
#=================================================================
# Data Listener
#=================================================================
def read_data():
global disable_modem_event_listener
ring_data = ""
while 1:
if not disable_modem_event_listener:
modem_data = analog_modem.readline()
if modem_data != "":
print modem_data
# Check if <DLE>b is in the stream
if (chr(16)+chr(98)) in modem_data:
#Terminate the call
if not exec_AT_cmd("ATH"):
print "Error: Busy Tone - Failed to terminate the call"
print "Trying to revoer the serial port"
recover_from_error()
else:
print "Busy Tone: Call Terminated"
# Check if <DLE>s is in the stream
if (chr(16)+chr(115)) == modem_data:
#Terminate the call
if not exec_AT_cmd("ATH"):
print "Error: Silence - Failed to terminate the call"
print "Trying to revoer the serial port"
recover_from_error()
else:
print "Silence: Call Terminated"
if ("-s".encode() in modem_data) or (("<DLE>-s").encode() in modem_data):
print "silence found during recording"
analog_modem.write(("<DLE>-!" + "\r").encode())
if ("RING" in modem_data) or ("DATE" in modem_data) or ("TIME" in modem_data) or ("NMBR" in modem_data):
global audio_file_name
if ("NMBR" in modem_data):
from_phone = (modem_data[5:]).strip()
if ("DATE" in modem_data):
call_date = (modem_data[5:]).strip()
if ("TIME" in modem_data):
call_time = (modem_data[5:]).strip()
if "RING" in modem_data.strip(chr(16)):
ring_data = ring_data + modem_data
ring_count = ring_data.count("RING")
if ring_count == 1:
pass
elif ring_count == RINGS_BEFORE_AUTO_ANSWER:
ring_data = ""
audio_file_name = from_phone + "_" + call_date + "_" + call_time + "_" + str(datetime.strftime(datetime.now(),"%S")) + ".wav"
from_phone = ''
call_date = ''
call_time = ''
record_audio()
#=================================================================
#=================================================================
# Close the Serial Port
#=================================================================
def close_modem_port():
# Try to close any active call
try:
exec_AT_cmd("ATH")
except:
pass
# Close the Serial COM Port
try:
if analog_modem.isOpen():
analog_modem.close()
print ("Serial Port closed...")
except:
print "Error: Unable to close the Serial Port."
sys.exit()
#=================================================================
# Main Function
init_modem_settings()
# Close the Modem Port when the program terminates
atexit.register(close_modem_port)
# Monitor Modem Serial Port
read_data()
If so, could you please show me a working code.
from Can an audio be recorded and another audio file be played at the same time on USRobotics 56K Modem?
No comments:
Post a Comment