Python Yamcs Client
      
    
    - General Client
 - Mission Database
 - TM/TC Processing
 - Archive
 - Link Management
 - Object Storage
 - File Transfer
 - Time Correlation (TCO)
 - Timeline
 - Examples
- alarms.py
 - archive_breakdown.py
 - archive_retrieval.py
 - authenticate.py
 - ccsds_completeness.py
 - commanding.py
 - cop1.py
 - file_transfer.py
 - links.py
 - events.py
 - mission_time.py
 - packet_subscription.py
 - parameter_subscription.py
 - plot_with_matplotlib.py
 - query_mdb.py
 - read_write_parameters.py
 - reconnection.py
 - timeline.py
 - write_mdb.py
 
 
        
        Related
      
      
    
    
      
      Download this Document
    
    
  parameter_subscription.pyΒΆ
from time import sleep
from yamcs.client import YamcsClient
def poll_values():
    """Shows how to poll values from the subscription."""
    subscription = processor.create_parameter_subscription(
        ["/YSS/SIMULATOR/BatteryVoltage1"]
    )
    sleep(5)
    print("Latest value:")
    print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage1"))
    sleep(5)
    print("Latest value:")
    print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage1"))
def receive_callbacks():
    """Shows how to receive callbacks on value updates."""
    def print_data(data):
        for parameter in data.parameters:
            print(parameter)
    processor.create_parameter_subscription(
        "/YSS/SIMULATOR/BatteryVoltage1", on_data=print_data
    )
    sleep(5)  # Subscription is non-blocking
def manage_subscription():
    """Shows how to interact with a parameter subscription."""
    subscription = processor.create_parameter_subscription(
        ["/YSS/SIMULATOR/BatteryVoltage1"]
    )
    sleep(5)
    print("Adding extra items to the existing subscription...")
    subscription.add(
        [
            "/YSS/SIMULATOR/Alpha",
            "/YSS/SIMULATOR/BatteryVoltage2",
            "MDB:OPS Name/SIMULATOR_PrimBusVoltage1",
        ]
    )
    sleep(5)
    print("Shrinking subscription...")
    subscription.remove("/YSS/SIMULATOR/Alpha")
    print("Cancelling the subscription...")
    subscription.cancel()
    print("Last values from cache:")
    print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage1"))
    print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage2"))
    print(subscription.get_value("/YSS/SIMULATOR/Alpha"))
    print(subscription.get_value("MDB:OPS Name/SIMULATOR_PrimBusVoltage1"))
if __name__ == "__main__":
    client = YamcsClient("localhost:8090")
    processor = client.get_processor("simulator", "realtime")
    print("Poll value cache")
    poll_values()
    print("\nReceive callbacks")
    receive_callbacks()
    print("\nModify the subscription")
    manage_subscription()