File size: 19,895 Bytes
137e4be
 
8f32de1
137e4be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8f32de1
137e4be
8f32de1
137e4be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
"""Reachy Mini Controller - Fun, Queue-Based Control Interface"""

import sys
import subprocess
import threading
import time
from dataclasses import dataclass
from typing import List, Optional

import gradio as gr
import numpy as np
import cv2

from reachy_mini import ReachyMini
from reachy_mini.daemon.backend.mujoco.video_udp import UDPJPEGFrameReceiver
from reachy_mini.utils import create_head_pose


@dataclass
class Movement:
    name: str
    x: float = 0
    y: float = 0
    z: float = 0
    roll: float = 0
    pitch: float = 0
    yaw: float = 0
    left_antenna: Optional[float] = None
    right_antenna: Optional[float] = None
    duration: float = 1.0


# Preset movements library
PRESET_MOVEMENTS = {
    "Home": Movement("Home", 0, 0, 0, 0, 0, 0, 0, 0),
    "Look Left": Movement("Look Left", 0, 0, 0, 0, 0, -30),
    "Look Right": Movement("Look Right", 0, 0, 0, 0, 0, 30),
    "Look Up": Movement("Look Up", 0, 0, 0, 0, -20, 0),
    "Look Down": Movement("Look Down", 0, 0, 0, 0, 15, 0),
    "Tilt Left": Movement("Tilt Left", 0, 0, 0, -20, 0, 0),
    "Tilt Right": Movement("Tilt Right", 0, 0, 0, 20, 0, 0),
    "Curious": Movement("Curious", 10, 0, 10, 15, -10, -15, 45, -45),
    "Excited": Movement("Excited", 0, 0, 20, 0, -15, 0, 90, 90),
    "Shy": Movement("Shy", -10, 0, -10, 10, 10, 20, -30, 30),
}

# Preset sequences
PRESET_SEQUENCES = {
    "Wave": ["Home", "Look Left", "Look Right", "Look Left", "Look Right", "Home"],
    "Nod": ["Home", "Look Down", "Look Up", "Look Down", "Home"],
    "Excited Dance": ["Home", "Excited", "Tilt Left", "Tilt Right", "Tilt Left", "Home"],
    "Look Around": ["Home", "Look Left", "Look Up", "Look Right", "Look Down", "Home"],
    "Curious Peek": ["Home", "Curious", "Look Right", "Look Left", "Home"],
}


class ReachyController:
    def __init__(self):
        self.daemon_process = None
        self.reachy_mini = None
        self.receiver = None
        self.frame_thread = None
        self.running = False
        self.frame_received_event = threading.Event()
        self.current_frame = np.zeros((1080, 1080, 3), dtype=np.uint8)
        
        self.movement_queue: List[Movement] = []
        self.is_playing = False
        self.playback_speed = 1.0
        self.play_thread = None
        self.auto_play = True  # Auto-play mode enabled by default
        
    def start_daemon(self):
        """Start the Reachy Mini daemon"""
        try:
            if self.daemon_process is not None:
                return "โš ๏ธ Daemon already running"
            
            python_cmd = "mjpython" if sys.platform == "darwin" else "python"
            self.daemon_process = subprocess.Popen(
                [python_cmd, "-m", "reachy_mini.daemon.app.main", "--sim",
                 "--scene", "minimal", "--headless", "--stream-robot-view"],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            
            self.start_frame_listener()
            frame_received = self.frame_received_event.wait(timeout=10)
            
            if self.daemon_process.poll() is not None:
                return "โŒ Daemon failed to start"
            
            return "โœ… Daemon started" if frame_received else "โš ๏ธ Daemon started (no video)"
        except Exception as e:
            return f"โŒ Daemon error: {str(e)}"
    
    def start_frame_listener(self):
        if self.receiver is None:
            self.receiver = UDPJPEGFrameReceiver(listen_port=5010)
        
        if self.frame_thread and self.frame_thread.is_alive():
            return
        
        self.running = True
        self.frame_thread = threading.Thread(target=self._recv_loop, daemon=True)
        self.frame_thread.start()
    
    def _recv_loop(self):
        while self.running:
            frame = self.receiver.recv_frame()
            if frame is not None:
                # Resize frame from 640x640 to 1080x1080
                resized_frame = cv2.resize(frame, (1080, 1080), interpolation=cv2.INTER_CUBIC)
                self.current_frame = resized_frame
                self.frame_received_event.set()
    
    def stream_frames(self):
        while True:
            yield self.current_frame
            time.sleep(0.04)
    
    def initialize_robot(self):
        """Initialize robot connection"""
        try:
            if self.reachy_mini is not None:
                return "โš ๏ธ Robot already connected"
            
            self.reachy_mini = ReachyMini(media_backend="default_no_video")
            return "โœ… Robot connected - Ready to control!"
        except Exception as e:
            return f"โŒ Connection failed: {str(e)}"
    
    def auto_start(self):
        """Auto-start daemon and robot connection"""
        status_msgs = []
        
        # Start daemon
        msg = self.start_daemon()
        status_msgs.append(msg)
        yield "\n".join(status_msgs)
        
        # Initialize robot
        msg = self.initialize_robot()
        status_msgs.append(msg)
        yield "\n".join(status_msgs)
    
    def restart_system(self):
        """Restart daemon and robot connection"""
        self.is_playing = False
        
        # Stop everything
        if self.daemon_process:
            self.daemon_process.terminate()
            self.daemon_process.wait(timeout=5)
            self.daemon_process = None
        
        self.running = False
        
        if self.frame_thread:
            self.frame_thread.join(timeout=2)
        
        if self.receiver:
            self.receiver.close()
            self.receiver = None
        
        if self.reachy_mini:
            try:
                self.reachy_mini.__exit__(None, None, None)
            except:
                pass
            self.reachy_mini = None
        
        yield "๐Ÿ”„ Restarting..."
        
        # Restart
        for status in self.auto_start():
            yield status
    
    def stop_all(self):
        """Stop everything"""
        self.is_playing = False
        
        if self.daemon_process:
            self.daemon_process.terminate()
            self.daemon_process.wait(timeout=5)
            self.daemon_process = None
        
        self.running = False
        
        if self.frame_thread:
            self.frame_thread.join()
        
        if self.receiver:
            self.receiver.close()
            self.receiver = None
        
        if self.reachy_mini:
            try:
                self.reachy_mini.__exit__(None, None, None)
            except:
                pass
            self.reachy_mini = None
        
        return "โœ… Stopped"
    
    def add_to_queue(self, movement_name, x, y, z, roll, pitch, yaw, 
                     left_ant, right_ant, duration):
        """Add a movement to the queue"""
        movement = Movement(
            name=movement_name or f"Custom {len(self.movement_queue) + 1}",
            x=x, y=y, z=z,
            roll=roll, pitch=pitch, yaw=yaw,
            left_antenna=left_ant,
            right_antenna=right_ant,
            duration=duration
        )
        self.movement_queue.append(movement)
        
        # Auto-play if enabled and not already playing
        if self.auto_play and not self.is_playing:
            self._start_auto_play()
        
        return self.format_queue(), f"โœ… Added: {movement.name}"
    
    def add_preset(self, preset_name):
        """Add a preset movement to queue"""
        if preset_name not in PRESET_MOVEMENTS:
            return self.format_queue(), f"โŒ Unknown preset: {preset_name}"
        
        self.movement_queue.append(PRESET_MOVEMENTS[preset_name])
        
        # Auto-play if enabled and not already playing
        if self.auto_play and not self.is_playing:
            self._start_auto_play()
        
        return self.format_queue(), f"โœ… Added: {preset_name}"
    
    def add_sequence(self, sequence_name):
        """Add a preset sequence to queue"""
        if sequence_name not in PRESET_SEQUENCES:
            return self.format_queue(), f"โŒ Unknown sequence"
        
        for preset_name in PRESET_SEQUENCES[sequence_name]:
            self.movement_queue.append(PRESET_MOVEMENTS[preset_name])
        
        # Auto-play if enabled and not already playing
        if self.auto_play and not self.is_playing:
            self._start_auto_play()
        
        return self.format_queue(), f"โœ… Added sequence: {sequence_name}"
    
    def clear_queue(self):
        """Clear the movement queue"""
        self.movement_queue.clear()
        self.is_playing = False  # Stop playback when clearing
        return self.format_queue(), "๐Ÿ—‘๏ธ Queue cleared"
    
    def remove_last(self):
        """Remove last movement from queue"""
        if self.movement_queue:
            removed = self.movement_queue.pop()
            return self.format_queue(), f"๐Ÿ—‘๏ธ Removed: {removed.name}"
        return self.format_queue(), "โš ๏ธ Queue is empty"
    
    def format_queue(self):
        """Format queue for display"""
        if not self.movement_queue:
            return "๐Ÿ“‹ Queue is empty\n\nAdd movements using presets or custom controls"
        
        lines = ["๐Ÿ“‹ Movement Queue:\n"]
        total_duration = 0
        
        for i, mov in enumerate(self.movement_queue, 1):
            total_duration += mov.duration
            emoji = "โ–ถ๏ธ" if i == 1 else "โธ๏ธ"
            
            # Format head position
            head_str = f"Head: x={mov.x:.0f} y={mov.y:.0f} z={mov.z:.0f} r={mov.roll:.0f}ยฐ p={mov.pitch:.0f}ยฐ y={mov.yaw:.0f}ยฐ"
            
            # Format antennas if present
            ant_str = ""
            if mov.left_antenna is not None and mov.right_antenna is not None:
                ant_str = f"\n   Antennas: L={mov.left_antenna:.0f}ยฐ R={mov.right_antenna:.0f}ยฐ"
            
            lines.append(
                f"{emoji} {i}. {mov.name} ({mov.duration}s)\n"
                f"   {head_str}{ant_str}"
            )
        
        lines.append(f"\nโฑ๏ธ Total duration: {total_duration:.1f}s")
        lines.append(f"{'๐Ÿ”„ Auto-play: ON' if self.auto_play else 'โธ๏ธ Auto-play: OFF'}")
        return "\n".join(lines)
    
    def play_queue(self, speed):
        """Execute the movement queue"""
        if not self.movement_queue:
            return self.format_queue(), "โš ๏ธ Queue is empty"
        
        if self.reachy_mini is None:
            return self.format_queue(), "โŒ Robot not initialized"
        
        if self.is_playing:
            return self.format_queue(), "โš ๏ธ Already playing"
        
        self.playback_speed = speed
        self.is_playing = True
        self.play_thread = threading.Thread(target=self._play_loop, daemon=True)
        self.play_thread.start()
        
        return self.format_queue(), f"โ–ถ๏ธ Playing at {speed}x speed..."
    
    def _play_loop(self):
        """Background thread to execute movements"""
        try:
            current_index = 0
            while self.is_playing:
                # Check if there are movements to play
                if current_index < len(self.movement_queue):
                    movement = self.movement_queue[current_index]
                    
                    # Create pose
                    pose = create_head_pose(
                        x=movement.x, y=movement.y, z=movement.z,
                        roll=movement.roll, pitch=movement.pitch, yaw=movement.yaw,
                        degrees=True, mm=True
                    )
                    
                    # Adjust duration by playback speed
                    actual_duration = movement.duration / self.playback_speed
                    
                    
                    if movement.left_antenna is not None and movement.right_antenna is not None:
                        self.reachy_mini.goto_target(
                            head=pose,
                            antennas=[
                                np.deg2rad(movement.right_antenna),
                                np.deg2rad(movement.left_antenna)
                            ],
                            duration=actual_duration
                        )
                    else:
                        self.reachy_mini.goto_target(head=pose, duration=actual_duration)
                    
                    current_index += 1
                else:
                    # No more movements, stop if not in auto-play mode
                    if not self.auto_play:
                        break
                    # In auto-play mode, wait for new movements
                    time.sleep(0.1)
            
        except Exception as e:
            print(f"Error during playback: {e}")
        finally:
            self.is_playing = False
    
    def _start_auto_play(self):
        """Start auto-play mode"""
        if self.reachy_mini is None:
            return
        
        if not self.is_playing:
            self.is_playing = True
            self.play_thread = threading.Thread(target=self._play_loop, daemon=True)
            self.play_thread.start()
    
    def toggle_auto_play(self, enabled):
        """Toggle auto-play mode"""
        self.auto_play = enabled
        
        if self.auto_play and self.movement_queue and not self.is_playing:
            self._start_auto_play()
        
        return self.format_queue(), f"{'๐Ÿ”„ Auto-play enabled' if enabled else 'โธ๏ธ Auto-play disabled'}"
    
    def update_speed(self, speed):
        """Update playback speed in real-time"""
        self.playback_speed = speed
        return f"โšก Speed: {speed}x"
    
    def stop_playback(self):
        """Stop current playback"""
        self.is_playing = False
        if self.play_thread:
            self.play_thread.join(timeout=2)
        
        # If auto-play is still enabled, inform user
        msg = "โน๏ธ Stopped"
        if self.auto_play:
            msg += " (auto-play still enabled)"
        
        return self.format_queue(), msg


# Create manager
manager = ReachyController()

# Build Gradio interface with improved layout
with gr.Blocks(title="Reachy Controller", theme=gr.themes.Soft()) as demo:
    gr.Markdown("# ๐Ÿค– Reachy Mini Controller")
    gr.Markdown("Create fun movement sequences for your robot!")
    
    with gr.Row():
        # Left panel - Controls (narrower)
        with gr.Column(scale=2):
            # Status section
            gr.Markdown("### ๐Ÿ“ก System Status")
            status = gr.Textbox(
                label="Status",
                lines=3,
                interactive=False,
                value="๐Ÿ”„ Initializing system..."
            )
            restart_btn = gr.Button("๐Ÿ”„ Restart System", variant="secondary", size="sm")
            
            gr.Markdown("### ๐ŸŽฎ Playback Controls")
            
            auto_play_toggle = gr.Checkbox(
                label="๐Ÿ”„ Auto-play",
                value=True,
                info="Execute movements automatically when added"
            )
            
            speed_slider = gr.Slider(
                0.25, 3.0, 1.0,
                label="โšก Speed Multiplier",
                info="Adjust playback speed"
            )
            
            with gr.Row():
                play_btn = gr.Button("โ–ถ๏ธ Play All", variant="primary", scale=2)
                stop_play_btn = gr.Button("โน๏ธ Stop", scale=1)
            
            with gr.Row():
                clear_btn = gr.Button("๐Ÿ—‘๏ธ Clear All")
                remove_btn = gr.Button("โ†ถ Remove Last")
            
            # Queue display
            queue_display = gr.Textbox(
                label="๐Ÿ“‹ Movement Queue",
                lines=20,
                interactive=False,
                value=manager.format_queue()
            )
        
        # Right panel - Simulation view (larger and more square)
        with gr.Column(scale=3):
            sim_view = gr.Image(
                label="๐ŸŽฌ Robot Simulation",
                type="numpy",
                height=1080,
                width=1080,
                show_label=True
            )
    
    # Movement builder section below
    with gr.Row():
        with gr.Column():
            gr.Markdown("### ๐ŸŽจ Quick Presets")
            
            with gr.Row():
                preset_btns = []
                for preset in list(PRESET_MOVEMENTS.keys())[:5]:
                    btn = gr.Button(preset, size="sm")
                    preset_btns.append((btn, preset))
            
            with gr.Row():
                for preset in list(PRESET_MOVEMENTS.keys())[5:]:
                    btn = gr.Button(preset, size="sm")
                    preset_btns.append((btn, preset))
        
        with gr.Column():
            gr.Markdown("### ๐ŸŽฌ Sequences")
            with gr.Row():
                sequence_dropdown = gr.Dropdown(
                    choices=list(PRESET_SEQUENCES.keys()),
                    label="Select Sequence",
                    value=None,
                    scale=3
                )
                add_seq_btn = gr.Button("โž• Add", scale=1)
    
    # Custom movement controls in accordion
    with gr.Accordion("๐ŸŽฏ Custom Movement Builder", open=False):
        custom_name = gr.Textbox(label="Movement Name", placeholder="My Move")
        
        with gr.Row():
            x = gr.Slider(-50, 50, 0, label="X (mm)", step=5)
            y = gr.Slider(-50, 50, 0, label="Y (mm)", step=5)
            z = gr.Slider(-20, 50, 0, label="Z (mm)", step=5)
        
        with gr.Row():
            roll = gr.Slider(-30, 30, 0, label="Roll (ยฐ)", step=5)
            pitch = gr.Slider(-30, 30, 0, label="Pitch (ยฐ)", step=5)
            yaw = gr.Slider(-45, 45, 0, label="Yaw (ยฐ)", step=5)
        
        with gr.Row():
            left_ant = gr.Slider(-180, 180, 0, label="Left Antenna (ยฐ)", step=15)
            right_ant = gr.Slider(-180, 180, 0, label="Right Antenna (ยฐ)", step=15)
        
        duration = gr.Slider(0.3, 3.0, 1.0, label="Duration (s)", step=0.1)
        
        add_custom_btn = gr.Button("โž• Add to Queue", variant="primary")
    
    # Auto-start on load
    demo.load(
        fn=manager.auto_start,
        outputs=[status]
    )
    
    # Stream video
    demo.load(fn=manager.stream_frames, outputs=sim_view)
    
    # Connect events - System control
    restart_btn.click(fn=manager.restart_system, outputs=[status])
    
    # Connect events - Playback control
    auto_play_toggle.change(
        fn=manager.toggle_auto_play,
        inputs=[auto_play_toggle],
        outputs=[queue_display, status]
    )
    
    speed_slider.change(
        fn=manager.update_speed,
        inputs=[speed_slider],
        outputs=[status]
    )
    
    play_btn.click(
        fn=manager.play_queue,
        inputs=[speed_slider],
        outputs=[queue_display, status]
    )
    stop_play_btn.click(fn=manager.stop_playback, outputs=[queue_display, status])
    clear_btn.click(fn=manager.clear_queue, outputs=[queue_display, status])
    remove_btn.click(fn=manager.remove_last, outputs=[queue_display, status])
    
    # Connect preset buttons
    for btn, preset_name in preset_btns:
        btn.click(
            fn=lambda p=preset_name: manager.add_preset(p),
            outputs=[queue_display, status]
        )
    
    # Connect sequence dropdown
    add_seq_btn.click(
        fn=manager.add_sequence,
        inputs=[sequence_dropdown],
        outputs=[queue_display, status]
    )
    
    # Connect custom movement
    add_custom_btn.click(
        fn=manager.add_to_queue,
        inputs=[custom_name, x, y, z, roll, pitch, yaw, left_ant, right_ant, duration],
        outputs=[queue_display, status]
    )

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0", server_port=7860)