Python Yamcs Client
- General Client
- Mission Database
- TM/TC Processing
- Archive
- Link Management
- Object Storage
- File Transfer
- Time Correlation (TCO)
- Timeline
- Examples
- alarms.py
- archive_breakdown.py
- archive_retrieval.py
- authenticate.py
- ccsds_completeness.py
- commanding.py
- cop1.py
- file_transfer.py
- links.py
- events.py
- mission_time.py
- packet_subscription.py
- parameter_subscription.py
- plot_with_matplotlib.py
- query_mdb.py
- read_write_parameters.py
- reconnection.py
- timeline.py
- write_mdb.py
Related
Download this Document
parameter_subscription.pyΒΆ
from time import sleep
from yamcs.client import YamcsClient
def poll_values():
"""Shows how to poll values from the subscription."""
subscription = processor.create_parameter_subscription(
["/YSS/SIMULATOR/BatteryVoltage1"]
)
sleep(5)
print("Latest value:")
print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage1"))
sleep(5)
print("Latest value:")
print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage1"))
def receive_callbacks():
"""Shows how to receive callbacks on value updates."""
def print_data(data):
for parameter in data.parameters:
print(parameter)
processor.create_parameter_subscription(
"/YSS/SIMULATOR/BatteryVoltage1", on_data=print_data
)
sleep(5) # Subscription is non-blocking
def manage_subscription():
"""Shows how to interact with a parameter subscription."""
subscription = processor.create_parameter_subscription(
["/YSS/SIMULATOR/BatteryVoltage1"]
)
sleep(5)
print("Adding extra items to the existing subscription...")
subscription.add(
[
"/YSS/SIMULATOR/Alpha",
"/YSS/SIMULATOR/BatteryVoltage2",
"MDB:OPS Name/SIMULATOR_PrimBusVoltage1",
]
)
sleep(5)
print("Shrinking subscription...")
subscription.remove("/YSS/SIMULATOR/Alpha")
print("Cancelling the subscription...")
subscription.cancel()
print("Last values from cache:")
print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage1"))
print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage2"))
print(subscription.get_value("/YSS/SIMULATOR/Alpha"))
print(subscription.get_value("MDB:OPS Name/SIMULATOR_PrimBusVoltage1"))
if __name__ == "__main__":
client = YamcsClient("localhost:8090")
processor = client.get_processor("simulator", "realtime")
print("Poll value cache")
poll_values()
print("\nReceive callbacks")
receive_callbacks()
print("\nModify the subscription")
manage_subscription()