/  Python API  /  TM/TC Processing

TM/TC Processing

The TM/TC API provides methods that you can use to programmatically interact with a TM/TC processor.

Reference

Snippets

Create a ProcessorClient for a specific processor:

from yamcs.client import YamcsClient

client = YamcsClient('localhost:8090')
processor = client.get_processor(instance='simulator', processor='realtime')

Read/Write Parameters

Read a single value. This returns the latest value from server cache.

pval = processor.get_parameter_value("/YSS/SIMULATOR/BatteryVoltage1")
print(pval)

Read a single value, but block until a fresh value could be processed:

pval = processor.get_parameter_value(
    "/YSS/SIMULATOR/BatteryVoltage2", from_cache=False, timeout=5
)
print(pval)

Read the latest value of multiple parameters at the same time:

pvals = processor.get_parameter_values(
    ["/YSS/SIMULATOR/BatteryVoltage1", "/YSS/SIMULATOR/BatteryVoltage2"]
)
print("battery1", pvals[0])
print("battery2", pvals[1])

Set the value of a parameter. Only some types of parameters can be written to. This includes software parameters (local to Yamcs) and parameters that are linked to an external system (such as a simulator).

processor.set_parameter_value("/YSS/SIMULATOR/AllowCriticalTC1", True)

Set the value of multiple parameters:

processor.set_parameter_values(
    {
        "/YSS/SIMULATOR/AllowCriticalTC1": False,
        "/YSS/SIMULATOR/AllowCriticalTC2": False,
    }
)

Parameter Subscription

Poll latest values from a subscription:

subscription = processor.create_parameter_subscription(
    ["/YSS/SIMULATOR/BatteryVoltage1"]
)

sleep(5)
print("Latest value:")
print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage1"))

sleep(5)
print("Latest value:")
print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage1"))

Receive ParameterData callbacks whenever one or more of the subscribed parameters have been updated:


def print_data(data):
    for parameter in data.parameters:
        print(parameter)

processor.create_parameter_subscription(
    "/YSS/SIMULATOR/BatteryVoltage1", on_data=print_data
)
sleep(5)  # Subscription is non-blocking

Create and modify a parameter subscription:

subscription = processor.create_parameter_subscription(
    ["/YSS/SIMULATOR/BatteryVoltage1"]
)

sleep(5)

print("Adding extra items to the existing subscription...")
subscription.add(
    [
        "/YSS/SIMULATOR/Alpha",
        "/YSS/SIMULATOR/BatteryVoltage2",
        "MDB:OPS Name/SIMULATOR_PrimBusVoltage1",
    ]
)

sleep(5)

print("Shrinking subscription...")
subscription.remove("/YSS/SIMULATOR/Alpha")

print("Cancelling the subscription...")
subscription.cancel()

print("Last values from cache:")
print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage1"))
print(subscription.get_value("/YSS/SIMULATOR/BatteryVoltage2"))
print(subscription.get_value("/YSS/SIMULATOR/Alpha"))
print(subscription.get_value("MDB:OPS Name/SIMULATOR_PrimBusVoltage1"))

Commanding

Issue a command (fire-and-forget):

command = processor.issue_command(
    "/YSS/SIMULATOR/SWITCH_VOLTAGE_OFF", args={"voltage_num": 1}
)
print("Issued", command)

To monitor acknowledgments, establish a command connection first. Commands issued from this connection are automatically updated with progress status:

conn = processor.create_command_connection()

command = conn.issue("/YSS/SIMULATOR/SWITCH_VOLTAGE_OFF", args={"voltage_num": 1})

ack = command.await_acknowledgment("Acknowledge_Sent")
print(ack.status)

The default Yamcs-local acknowledgments are:

  • Acknowledge_Queued

  • Acknowledge_Released

  • Acknowledge_Sent

Custom telemetry verifiers or command links may cause additional acknowledgments to be generated.

If configured, command completion can also be monitored:

conn = processor.create_command_connection()

command1 = conn.issue("/YSS/SIMULATOR/SWITCH_VOLTAGE_OFF", args={"voltage_num": 1})

# Issue 2nd command only if the previous command was completed successfully.
command1.await_complete()
if command1.is_success():
    conn.issue("/YSS/SIMULATOR/SWITCH_VOLTAGE_ON", args={"voltage_num": 1})
else:
    print("Command 1 failed:", command1.error)

Alarm Monitoring

Receive AlarmUpdate callbacks:


def callback(alarm_update):
    print("Alarm Update:", alarm_update)

processor.create_alarm_subscription(callback)

Acknowledge all active alarms:

for alarm in processor.list_alarms():
    if not alarm.is_acknowledged:
        processor.acknowledge_alarm(alarm, comment="false alarm")