Python Yamcs Client
- General Client
- Mission Database
- TM/TC Processing
- Archive
- Link Management
- Object Storage
- File Transfer
- Time Correlation (TCO)
- Timeline
- Examples
- alarms.py
- archive_breakdown.py
- archive_retrieval.py
- authenticate.py
- ccsds_completeness.py
- commanding.py
- cop1.py
- file_transfer.py
- links.py
- events.py
- mission_time.py
- packet_subscription.py
- parameter_subscription.py
- plot_with_matplotlib.py
- query_mdb.py
- read_write_parameters.py
- reconnection.py
- timeline.py
- write_mdb.py
Related
Download this Document
commanding.pyΒΆ
from time import sleep
from yamcs.client import VerificationConfig, YamcsClient
def issue_command():
"""Issue a command to turn battery 1 off."""
command = processor.issue_command(
"/YSS/SIMULATOR/SWITCH_VOLTAGE_OFF", args={"voltage_num": 1}
)
print("Issued", command)
def issue_command_modify_verification():
"""Issue a command with changed verification."""
verification = VerificationConfig()
verification.disable("Started")
verification.modify_check_window("Queued", 1, 5)
command = processor.issue_command(
"/YSS/SIMULATOR/SWITCH_VOLTAGE_OFF",
args={"voltage_num": 1},
verification=verification,
)
print("Issued", command)
def issue_command_no_verification():
"""Issue a command with no verification."""
verification = VerificationConfig()
verification.disable()
command = processor.issue_command(
"/YSS/SIMULATOR/SWITCH_VOLTAGE_OFF",
args={"voltage_num": 1},
verification=verification,
)
print("Issued", command)
def monitor_command():
"""Monitor command completion."""
conn = processor.create_command_connection()
command1 = conn.issue("/YSS/SIMULATOR/SWITCH_VOLTAGE_OFF", args={"voltage_num": 1})
# Issue 2nd command only if the previous command was completed successfully.
command1.await_complete()
if command1.is_success():
conn.issue("/YSS/SIMULATOR/SWITCH_VOLTAGE_ON", args={"voltage_num": 1})
else:
print("Command 1 failed:", command1.error)
def monitor_acknowledgment():
"""Monitor command acknowledgment."""
conn = processor.create_command_connection()
command = conn.issue("/YSS/SIMULATOR/SWITCH_VOLTAGE_OFF", args={"voltage_num": 1})
ack = command.await_acknowledgment("Acknowledge_Sent")
print(ack.status)
def listen_to_command_history():
"""Receive updates on command history updates."""
def tc_callback(rec):
print("TC:", rec)
processor.create_command_history_subscription(on_data=tc_callback)
def tm_callback(delivery):
for parameter in delivery.parameters:
print("TM:", parameter)
if __name__ == "__main__":
client = YamcsClient("localhost:8090")
processor = client.get_processor("simulator", "realtime")
print("Start to listen to command history")
listen_to_command_history()
issue_command_no_verification()
issue_command_modify_verification()
print("Issue a command")
issue_command()
# Monitor the voltage parameter to confirm that it is 0
subscription = processor.create_parameter_subscription(
["/YSS/SIMULATOR/BatteryVoltage1"], on_data=tm_callback
)
# Subscription is non-blocking
sleep(20)