Skip to main content

hydro_lang/deploy/
deploy_graph_containerized.rs

1//! Deployment backend for Hydro that uses Docker to provision and launch services.
2
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11    BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12    RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19// Re-export LinuxCompileType so users can configure compile type without depending on hydro_deploy directly.
20pub use hydro_deploy::LinuxCompileType;
21use hydro_deploy::RustCrate;
22use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
23use nanoid::nanoid;
24use proc_macro2::Span;
25use sinktools::lazy::LazySink;
26use stageleft::QuotedWithContext;
27use syn::parse_quote;
28use tar::{Builder, Header};
29use tokio::net::TcpStream;
30use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
31use tracing::{Instrument, instrument, trace, warn};
32
33use super::deploy_runtime_containerized::*;
34use crate::compile::builder::ExternalPortId;
35use crate::compile::deploy::DeployResult;
36use crate::compile::deploy_provider::{
37    ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
38};
39use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
40use crate::location::dynamic::LocationId;
41use crate::location::member_id::TaglessMemberId;
42use crate::location::{LocationKey, MembershipEvent, NetworkHint};
43
44/// represents a docker network
45#[derive(Clone, Debug)]
46pub struct DockerNetwork {
47    name: String,
48}
49
50impl DockerNetwork {
51    /// creates a new docker network (will actually be created when deployment.start() is called).
52    pub fn new(name: String) -> Self {
53        Self {
54            name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
55        }
56    }
57}
58
59/// Represents a process running in a docker container
60#[derive(Clone)]
61pub struct DockerDeployProcess {
62    key: LocationKey,
63    name: String,
64    next_port: Rc<RefCell<u16>>,
65    rust_crate: Rc<RefCell<Option<RustCrate>>>,
66
67    exposed_ports: Rc<RefCell<Vec<u16>>>,
68
69    docker_container_name: Rc<RefCell<Option<String>>>,
70
71    compilation_options: Option<String>,
72
73    config: Vec<String>,
74
75    network: DockerNetwork,
76
77    base_image: Option<String>,
78
79    linux_compile_type: LinuxCompileType,
80
81    features: Vec<String>,
82}
83
84impl Node for DockerDeployProcess {
85    type Port = u16;
86    type Meta = ();
87    type InstantiateEnv = DockerDeploy;
88
89    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
90    fn next_port(&self) -> Self::Port {
91        let port = {
92            let mut borrow = self.next_port.borrow_mut();
93            let port = *borrow;
94            *borrow += 1;
95            port
96        };
97
98        port
99    }
100
101    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
102    fn update_meta(&self, _meta: &Self::Meta) {}
103
104    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
105    fn instantiate(
106        &self,
107        _env: &mut Self::InstantiateEnv,
108        meta: &mut Self::Meta,
109        graph: DfirGraph,
110        extra_stmts: &[syn::Stmt],
111        sidecars: &[syn::Expr],
112    ) {
113        let (bin_name, config) = create_graph_trybuild(
114            graph,
115            extra_stmts,
116            sidecars,
117            Some(&self.name),
118            crate::compile::trybuild::generate::DeployMode::Containerized,
119            LinkingMode::Static,
120        );
121
122        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
123            .target_dir(config.target_dir)
124            .example(bin_name)
125            .no_default_features();
126
127        ret = ret.display_name("test_display_name");
128
129        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
130
131        if let Some(features) = config.features {
132            ret = ret.features(features);
133        }
134
135        if !self.features.is_empty() {
136            ret = ret.features(self.features.clone());
137        }
138
139        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
140        ret = ret.config("build.incremental = false");
141
142        *self.rust_crate.borrow_mut() = Some(ret);
143    }
144}
145
146/// Represents a logical cluster, which can be a variable amount of individual containers.
147#[derive(Clone)]
148pub struct DockerDeployCluster {
149    key: LocationKey,
150    name: String,
151    next_port: Rc<RefCell<u16>>,
152    rust_crate: Rc<RefCell<Option<RustCrate>>>,
153
154    exposed_ports: Rc<RefCell<Vec<u16>>>,
155
156    docker_container_name: Rc<RefCell<Vec<String>>>,
157
158    compilation_options: Option<String>,
159
160    config: Vec<String>,
161
162    count: usize,
163
164    base_image: Option<String>,
165
166    linux_compile_type: LinuxCompileType,
167
168    features: Vec<String>,
169}
170
171impl Node for DockerDeployCluster {
172    type Port = u16;
173    type Meta = ();
174    type InstantiateEnv = DockerDeploy;
175
176    #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
177    fn next_port(&self) -> Self::Port {
178        let port = {
179            let mut borrow = self.next_port.borrow_mut();
180            let port = *borrow;
181            *borrow += 1;
182            port
183        };
184
185        port
186    }
187
188    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
189    fn update_meta(&self, _meta: &Self::Meta) {}
190
191    #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
192    fn instantiate(
193        &self,
194        _env: &mut Self::InstantiateEnv,
195        _meta: &mut Self::Meta,
196        graph: DfirGraph,
197        extra_stmts: &[syn::Stmt],
198        sidecars: &[syn::Expr],
199    ) {
200        let (bin_name, config) = create_graph_trybuild(
201            graph,
202            extra_stmts,
203            sidecars,
204            Some(&self.name),
205            crate::compile::trybuild::generate::DeployMode::Containerized,
206            LinkingMode::Static,
207        );
208
209        let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
210            .target_dir(config.target_dir)
211            .example(bin_name)
212            .no_default_features();
213
214        ret = ret.display_name("test_display_name");
215
216        ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
217
218        if let Some(features) = config.features {
219            ret = ret.features(features);
220        }
221
222        if !self.features.is_empty() {
223            ret = ret.features(self.features.clone());
224        }
225
226        ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
227        ret = ret.config("build.incremental = false");
228
229        *self.rust_crate.borrow_mut() = Some(ret);
230    }
231}
232
233/// Represents an external process, outside the control of this deployment but still with some communication into this deployment.
234#[derive(Clone, Debug)]
235#[expect(
236    dead_code,
237    reason = "fields used via Rc<RefCell> in RegisterPort impl and ExternalBytesPort construction"
238)]
239pub struct DockerDeployExternal {
240    /// The location key for this external, used for port handle construction.
241    pub(crate) key: LocationKey,
242    name: String,
243    next_port: Rc<RefCell<u16>>,
244
245    /// Counter for generating ExternalPortId values at deploy time.
246    next_external_port_id: Rc<RefCell<crate::Counter<ExternalPortId>>>,
247
248    ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
249
250    connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
251}
252
253impl Node for DockerDeployExternal {
254    type Port = u16;
255    type Meta = ();
256    type InstantiateEnv = DockerDeploy;
257
258    #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
259    fn next_port(&self) -> Self::Port {
260        let port = {
261            let mut borrow = self.next_port.borrow_mut();
262            let port = *borrow;
263            *borrow += 1;
264            port
265        };
266
267        port
268    }
269
270    #[instrument(level = "trace", skip_all, fields(name = self.name))]
271    fn update_meta(&self, _meta: &Self::Meta) {}
272
273    #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
274    fn instantiate(
275        &self,
276        _env: &mut Self::InstantiateEnv,
277        meta: &mut Self::Meta,
278        graph: DfirGraph,
279        extra_stmts: &[syn::Stmt],
280        sidecars: &[syn::Expr],
281    ) {
282        trace!(name: "surface", surface = graph.surface_syntax_string());
283    }
284}
285
286impl DockerDeployProcess {
287    /// Expose a TCP port on this process for external access.
288    ///
289    /// The binary running on this process must bind a `TcpListener` on this port.
290    /// This method ensures the port appears in the Docker image's `EXPOSE` directives
291    /// and is available for endpoint discovery via [`Self::get_tcp_endpoint`].
292    pub fn expose_port(&self, port: u16) {
293        self.exposed_ports.borrow_mut().push(port);
294    }
295
296    /// Returns the TCP endpoint `(host, port)` for this process exposing
297    /// the given container port. Queries Docker for the dynamically allocated
298    /// host port mapping.
299    pub async fn get_tcp_endpoint(&self, container_port: u16) -> (String, u16) {
300        let name = self
301            .docker_container_name
302            .borrow()
303            .as_ref()
304            .expect("container not yet started")
305            .clone();
306        let host_port = find_dynamically_allocated_docker_port(&name, container_port).await;
307        ("localhost".to_owned(), host_port)
308    }
309}
310
311impl DockerDeployCluster {
312    /// Expose a TCP port on every member of this cluster for external access.
313    ///
314    /// The binary running on this cluster must bind a `TcpListener` on this port.
315    /// This method ensures the port appears in the Docker image's `EXPOSE` directives
316    /// and is available for endpoint discovery via [`Self::get_all_tcp_endpoints`].
317    pub fn expose_port(&self, port: u16) {
318        self.exposed_ports.borrow_mut().push(port);
319    }
320
321    /// Returns TCP endpoints `(host, port)` for all cluster members exposing
322    /// the given container port. Queries Docker for the dynamically allocated
323    /// host port mapping.
324    pub async fn get_all_tcp_endpoints(&self, container_port: u16) -> Vec<(String, u16)> {
325        let names = self.docker_container_name.borrow().clone();
326        let mut endpoints = Vec::with_capacity(names.len());
327        for name in names {
328            let host_port = find_dynamically_allocated_docker_port(&name, container_port).await;
329            endpoints.push(("localhost".to_owned(), host_port));
330        }
331        endpoints
332    }
333}
334
335type DynSourceSink<Out, In, InErr> = (
336    Pin<Box<dyn Stream<Item = Out>>>,
337    Pin<Box<dyn Sink<In, Error = InErr>>>,
338);
339
340impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
341    #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
342    fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
343        self.ports.borrow_mut().insert(external_port_id, port);
344    }
345
346    fn as_bytes_bidi(
347        &self,
348        external_port_id: ExternalPortId,
349    ) -> impl Future<
350        Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
351    > + 'a {
352        let guard =
353            tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
354
355        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
356        let (docker_container_name, remote_port, _) = self
357            .connection_info
358            .borrow()
359            .get(&local_port)
360            .unwrap()
361            .clone();
362
363        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
364
365        async move {
366            let local_port =
367                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
368            let remote_ip_address = "localhost";
369
370            trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
371
372            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
373                .await
374                .unwrap();
375
376            trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
377
378            let (rx, tx) = stream.into_split();
379
380            let source = Box::pin(
381                FramedRead::new(rx, LengthDelimitedCodec::new()),
382            ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
383
384            let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
385                as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
386
387            (source, sink)
388        }
389        .instrument(guard.exit())
390    }
391
392    fn as_bincode_bidi<InT, OutT>(
393        &self,
394        external_port_id: ExternalPortId,
395    ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
396    where
397        InT: serde::Serialize + 'static,
398        OutT: serde::de::DeserializeOwned + 'static,
399    {
400        let guard =
401            tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
402
403        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
404        let (docker_container_name, remote_port, _) = self
405            .connection_info
406            .borrow()
407            .get(&local_port)
408            .unwrap()
409            .clone();
410
411        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
412
413        async move {
414            let local_port =
415                find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
416            let remote_ip_address = "localhost";
417
418            trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
419
420            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
421                .await
422                .unwrap();
423
424            trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
425
426            let (rx, tx) = stream.into_split();
427
428            let source = Box::pin(
429                FramedRead::new(rx, LengthDelimitedCodec::new())
430                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
431            ) as Pin<Box<dyn Stream<Item = OutT>>>;
432
433            let sink = Box::pin(
434                FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
435                    Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
436                }),
437            ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
438
439            (source, sink)
440        }
441        .instrument(guard.exit())
442    }
443
444    fn as_bincode_sink<T>(
445        &self,
446        external_port_id: ExternalPortId,
447    ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
448    where
449        T: serde::Serialize + 'static,
450    {
451        let guard =
452            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
453
454        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
455        let (docker_container_name, remote_port, _) = self
456            .connection_info
457            .borrow()
458            .get(&local_port)
459            .unwrap()
460            .clone();
461
462        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
463
464        async move {
465            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
466            let remote_ip_address = "localhost";
467
468            Box::pin(
469                LazySink::new(move || {
470                    Box::pin(async move {
471                        trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
472
473                        let stream =
474                            TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
475                                .await?;
476
477                        trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
478
479                        Result::<_, std::io::Error>::Ok(FramedWrite::new(
480                            stream,
481                            LengthDelimitedCodec::new(),
482                        ))
483                    })
484                })
485                .with(move |v| async move {
486                    Ok(Bytes::from(bincode::serialize(&v).unwrap()))
487                }),
488            ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
489        }
490        .instrument(guard.exit())
491    }
492
493    fn as_bincode_source<T>(
494        &self,
495        external_port_id: ExternalPortId,
496    ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
497    where
498        T: serde::de::DeserializeOwned + 'static,
499    {
500        let guard =
501            tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
502
503        let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
504        let (docker_container_name, remote_port, _) = self
505            .connection_info
506            .borrow()
507            .get(&local_port)
508            .unwrap()
509            .clone();
510
511        let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
512
513        async move {
514
515            let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
516            let remote_ip_address = "localhost";
517
518            trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
519
520            let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
521                .await
522                .unwrap();
523
524            trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
525
526            Box::pin(
527                FramedRead::new(stream, LengthDelimitedCodec::new())
528                    .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
529            ) as Pin<Box<dyn Stream<Item = T>>>
530        }
531        .instrument(guard.exit())
532    }
533}
534
535#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
536async fn find_dynamically_allocated_docker_port(
537    docker_container_name: &str,
538    destination_port: u16,
539) -> u16 {
540    let docker = Docker::connect_with_local_defaults().unwrap();
541
542    let container_info = docker
543        .inspect_container(docker_container_name, None::<InspectContainerOptions>)
544        .await
545        .unwrap();
546
547    trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
548
549    // container_info={"1001/tcp": Some([PortBinding { host_ip: Some("0.0.0.0"), host_port: Some("32771") }, PortBinding { host_ip: Some("::"), host_port: Some("32771") }])} destination_port=1001
550    let remote_port = container_info
551        .network_settings
552        .as_ref()
553        .unwrap()
554        .ports
555        .as_ref()
556        .unwrap()
557        .get(&format!("{destination_port}/tcp"))
558        .unwrap()
559        .as_ref()
560        .unwrap()
561        .iter()
562        .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
563        .unwrap()
564        .host_port
565        .as_ref()
566        .unwrap()
567        .parse()
568        .unwrap();
569
570    remote_port
571}
572
573/// For deploying to a local docker instance
574pub struct DockerDeploy {
575    docker_processes: Vec<DockerDeployProcessSpec>,
576    docker_clusters: Vec<DockerDeployClusterSpec>,
577    network: DockerNetwork,
578    deployment_instance: String,
579}
580
581#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
582async fn create_and_start_container(
583    docker: &Docker,
584    container_name: &str,
585    image_name: &str,
586    network_name: &str,
587    deployment_instance: &str,
588) -> Result<(), anyhow::Error> {
589    let config = ContainerCreateBody {
590        image: Some(image_name.to_owned()),
591        hostname: Some(container_name.to_owned()),
592        host_config: Some(HostConfig {
593            binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
594            publish_all_ports: Some(true),
595            port_bindings: Some(HashMap::new()), /* Due to a bug in docker, if you don't send empty port bindings with publish_all_ports set to true and with a docker image that has EXPOSE directives in it, docker will crash because it will try to write to a map in memory that it has not initialized yet. Setting port_bindings explicitly to an empty map will initialize it first so that it does not break. */
596            ..Default::default()
597        }),
598        env: Some(vec![
599            format!("CONTAINER_NAME={container_name}"),
600            format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
601            format!("RUST_LOG=trace"),
602        ]),
603        networking_config: Some(NetworkingConfig {
604            endpoints_config: Some(HashMap::from([(
605                network_name.to_owned(),
606                EndpointSettings {
607                    ..Default::default()
608                },
609            )])),
610        }),
611        tty: Some(true),
612        ..Default::default()
613    };
614
615    let options = CreateContainerOptions {
616        name: Some(container_name.to_owned()),
617        ..Default::default()
618    };
619
620    tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
621    docker.create_container(Some(options), config).await?;
622    docker
623        .start_container(container_name, None::<StartContainerOptions>)
624        .await?;
625
626    Ok(())
627}
628
629#[instrument(level = "trace", skip_all, fields(%image_name))]
630async fn build_and_create_image(
631    rust_crate: &Rc<RefCell<Option<RustCrate>>>,
632    compilation_options: Option<&str>,
633    config: &[String],
634    exposed_ports: &[u16],
635    image_name: &str,
636    base_image: Option<&str>,
637    linux_compile_type: LinuxCompileType,
638) -> Result<(), anyhow::Error> {
639    let mut rust_crate = rust_crate
640        .borrow_mut()
641        .take()
642        .unwrap()
643        .rustflags(compilation_options.unwrap_or_default());
644
645    for cfg in config {
646        rust_crate = rust_crate.config(cfg);
647    }
648
649    let build_output = match build_crate_memoized(
650        rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(linux_compile_type)),
651    )
652    .await
653    {
654        Ok(build_output) => build_output,
655        Err(BuildError::FailedToBuildCrate {
656            exit_status,
657            diagnostics,
658            text_lines,
659            stderr_lines,
660        }) => {
661            let diagnostics = diagnostics
662                .into_iter()
663                .map(|d| d.rendered.unwrap())
664                .collect::<Vec<_>>()
665                .join("\n");
666            let text_lines = text_lines.join("\n");
667            let stderr_lines = stderr_lines.join("\n");
668
669            anyhow::bail!(
670                r#"
671Failed to build crate {exit_status:?}
672--- diagnostics
673---
674{diagnostics}
675---
676---
677---
678
679--- text_lines
680---
681---
682{text_lines}
683---
684---
685---
686
687--- stderr_lines
688---
689---
690{stderr_lines}
691---
692---
693---"#
694            );
695        }
696        Err(err) => {
697            anyhow::bail!("Failed to build crate {err:?}");
698        }
699    };
700
701    let docker = Docker::connect_with_local_defaults()?;
702
703    let mut tar_data = Vec::new();
704    {
705        let mut tar = Builder::new(&mut tar_data);
706
707        let exposed_ports = exposed_ports
708            .iter()
709            .map(|port| format!("EXPOSE {port}/tcp"))
710            .collect::<Vec<_>>()
711            .join("\n");
712
713        let from_image = base_image.unwrap_or("scratch");
714        let dockerfile_content = format!(
715            r#"
716                FROM {from_image}
717                {exposed_ports}
718                COPY app /app
719                CMD ["/app"]
720            "#,
721        );
722
723        trace!(name: "dockerfile", %dockerfile_content);
724
725        let mut header = Header::new_gnu();
726        header.set_path("Dockerfile")?;
727        header.set_size(dockerfile_content.len() as u64);
728        header.set_cksum();
729        tar.append(&header, dockerfile_content.as_bytes())?;
730
731        let mut header = Header::new_gnu();
732        header.set_path("app")?;
733        header.set_size(build_output.bin_data.len() as u64);
734        header.set_mode(0o755);
735        header.set_cksum();
736        tar.append(&header, &build_output.bin_data[..])?;
737
738        tar.finish()?;
739    }
740
741    let build_options = BuildImageOptions {
742        dockerfile: "Dockerfile".to_owned(),
743        t: Some(image_name.to_owned()),
744        rm: true,
745        ..Default::default()
746    };
747
748    use bollard::errors::Error;
749
750    let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
751    let mut build_stream = docker.build_image(build_options, None, Some(body));
752    while let Some(msg) = build_stream.next().await {
753        match msg {
754            Ok(_) => {}
755            Err(e) => match e {
756                Error::DockerStreamError { error } => {
757                    return Err(anyhow::anyhow!(
758                        "Docker build failed: DockerStreamError: {{ error: {error} }}"
759                    ));
760                }
761                _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
762            },
763        }
764    }
765
766    Ok(())
767}
768
769impl DockerDeploy {
770    /// Create a new deployment
771    pub fn new(network: DockerNetwork) -> Self {
772        Self {
773            docker_processes: Vec::new(),
774            docker_clusters: Vec::new(),
775            network,
776            deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
777        }
778    }
779
780    /// Add an internal docker service to the deployment.
781    pub fn add_localhost_docker(
782        &mut self,
783        compilation_options: Option<String>,
784        config: Vec<String>,
785    ) -> DockerDeployProcessSpec {
786        let process = DockerDeployProcessSpec {
787            compilation_options,
788            config,
789            network: self.network.clone(),
790            deployment_instance: self.deployment_instance.clone(),
791            base_image: None,
792            linux_compile_type: LinuxCompileType::Musl,
793            features: vec![],
794        };
795
796        self.docker_processes.push(process.clone());
797
798        process
799    }
800
801    /// Add an internal docker cluster to the deployment.
802    pub fn add_localhost_docker_cluster(
803        &mut self,
804        compilation_options: Option<String>,
805        config: Vec<String>,
806        count: usize,
807    ) -> DockerDeployClusterSpec {
808        let cluster = DockerDeployClusterSpec {
809            compilation_options,
810            config,
811            count,
812            deployment_instance: self.deployment_instance.clone(),
813            base_image: None,
814            linux_compile_type: LinuxCompileType::Musl,
815            features: vec![],
816        };
817
818        self.docker_clusters.push(cluster.clone());
819
820        cluster
821    }
822
823    /// Add an external process to the deployment.
824    pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
825        DockerDeployExternalSpec { name }
826    }
827
828    /// Get the deployment instance from this deployment.
829    pub fn get_deployment_instance(&self) -> String {
830        self.deployment_instance.clone()
831    }
832
833    /// Create docker images.
834    #[instrument(level = "trace", skip_all)]
835    pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
836        for (_, _, process) in nodes.get_all_processes() {
837            let exposed_ports = process.exposed_ports.borrow().clone();
838
839            build_and_create_image(
840                &process.rust_crate,
841                process.compilation_options.as_deref(),
842                &process.config,
843                &exposed_ports,
844                &process.name,
845                process.base_image.as_deref(),
846                process.linux_compile_type,
847            )
848            .await?;
849        }
850
851        for (_, _, cluster) in nodes.get_all_clusters() {
852            let exposed_ports = cluster.exposed_ports.borrow().clone();
853            build_and_create_image(
854                &cluster.rust_crate,
855                cluster.compilation_options.as_deref(),
856                &cluster.config,
857                &exposed_ports,
858                &cluster.name,
859                cluster.base_image.as_deref(),
860                cluster.linux_compile_type,
861            )
862            .await?;
863        }
864
865        Ok(())
866    }
867
868    /// Start the deployment, tell docker to create containers from the existing provisioned images.
869    #[instrument(level = "trace", skip_all)]
870    pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
871        let docker = Docker::connect_with_local_defaults()?;
872
873        match docker
874            .create_network(NetworkCreateRequest {
875                name: self.network.name.clone(),
876                driver: Some("bridge".to_owned()),
877                ..Default::default()
878            })
879            .await
880        {
881            Ok(v) => v.id,
882            Err(e) => {
883                panic!("Failed to create docker network: {e:?}");
884            }
885        };
886
887        for (_, _, process) in nodes.get_all_processes() {
888            let docker_container_name: String = get_docker_container_name(&process.name, None);
889            *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
890
891            create_and_start_container(
892                &docker,
893                &docker_container_name,
894                &process.name,
895                &self.network.name,
896                &self.deployment_instance,
897            )
898            .await?;
899        }
900
901        for (_, _, cluster) in nodes.get_all_clusters() {
902            for num in 0..cluster.count {
903                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
904                cluster
905                    .docker_container_name
906                    .borrow_mut()
907                    .push(docker_container_name.clone());
908
909                create_and_start_container(
910                    &docker,
911                    &docker_container_name,
912                    &cluster.name,
913                    &self.network.name,
914                    &self.deployment_instance,
915                )
916                .await?;
917            }
918        }
919
920        Ok(())
921    }
922
923    /// Stop the deployment, destroy all containers
924    #[instrument(level = "trace", skip_all)]
925    pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
926        let docker = Docker::connect_with_local_defaults()?;
927
928        for (_, _, process) in nodes.get_all_processes() {
929            let docker_container_name: String = get_docker_container_name(&process.name, None);
930
931            docker
932                .kill_container(&docker_container_name, None::<KillContainerOptions>)
933                .await?;
934        }
935
936        for (_, _, cluster) in nodes.get_all_clusters() {
937            for num in 0..cluster.count {
938                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
939
940                docker
941                    .kill_container(&docker_container_name, None::<KillContainerOptions>)
942                    .await?;
943            }
944        }
945
946        Ok(())
947    }
948
949    /// remove containers, images, and networks.
950    #[instrument(level = "trace", skip_all)]
951    pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
952        let docker = Docker::connect_with_local_defaults()?;
953
954        for (_, _, process) in nodes.get_all_processes() {
955            let docker_container_name: String = get_docker_container_name(&process.name, None);
956
957            docker
958                .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
959                .await?;
960        }
961
962        for (_, _, cluster) in nodes.get_all_clusters() {
963            for num in 0..cluster.count {
964                let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
965
966                docker
967                    .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
968                    .await?;
969            }
970        }
971
972        docker
973            .remove_network(&self.network.name)
974            .await
975            .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
976
977        use bollard::query_parameters::RemoveImageOptions;
978
979        for (_, _, process) in nodes.get_all_processes() {
980            docker
981                .remove_image(&process.name, None::<RemoveImageOptions>, None)
982                .await?;
983        }
984
985        for (_, _, cluster) in nodes.get_all_clusters() {
986            docker
987                .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
988                .await?;
989        }
990
991        Ok(())
992    }
993}
994
995impl<'a> Deploy<'a> for DockerDeploy {
996    type Meta = ();
997    type InstantiateEnv = Self;
998
999    type Process = DockerDeployProcess;
1000    type Cluster = DockerDeployCluster;
1001    type External = DockerDeployExternal;
1002
1003    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
1004    fn o2o_sink_source(
1005        _env: &mut Self::InstantiateEnv,
1006        p1: &Self::Process,
1007        p1_port: &<Self::Process as Node>::Port,
1008        p2: &Self::Process,
1009        p2_port: &<Self::Process as Node>::Port,
1010        name: Option<&str>,
1011        networking_info: &crate::networking::NetworkingInfo,
1012    ) -> (syn::Expr, syn::Expr) {
1013        match networking_info {
1014            crate::networking::NetworkingInfo::Tcp {
1015                fault: crate::networking::TcpFault::FailStop,
1016            } => {}
1017            _ => panic!("Unsupported networking info: {:?}", networking_info),
1018        }
1019
1020        deploy_containerized_o2o(
1021            &p2.name,
1022            name.expect("channel name is required for containerized deployment"),
1023        )
1024    }
1025
1026    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
1027    fn o2o_connect(
1028        p1: &Self::Process,
1029        p1_port: &<Self::Process as Node>::Port,
1030        p2: &Self::Process,
1031        p2_port: &<Self::Process as Node>::Port,
1032    ) -> Box<dyn FnOnce()> {
1033        let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1034
1035        Box::new(move || {
1036            trace!(name: "o2o_connect thunk", %serialized);
1037        })
1038    }
1039
1040    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1041    fn o2m_sink_source(
1042        _env: &mut Self::InstantiateEnv,
1043        p1: &Self::Process,
1044        p1_port: &<Self::Process as Node>::Port,
1045        c2: &Self::Cluster,
1046        c2_port: &<Self::Cluster as Node>::Port,
1047        name: Option<&str>,
1048        networking_info: &crate::networking::NetworkingInfo,
1049    ) -> (syn::Expr, syn::Expr) {
1050        match networking_info {
1051            crate::networking::NetworkingInfo::Tcp {
1052                fault: crate::networking::TcpFault::FailStop,
1053            } => {}
1054            _ => panic!("Unsupported networking info: {:?}", networking_info),
1055        }
1056
1057        deploy_containerized_o2m(
1058            name.expect("channel name is required for containerized deployment"),
1059        )
1060    }
1061
1062    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1063    fn o2m_connect(
1064        p1: &Self::Process,
1065        p1_port: &<Self::Process as Node>::Port,
1066        c2: &Self::Cluster,
1067        c2_port: &<Self::Cluster as Node>::Port,
1068    ) -> Box<dyn FnOnce()> {
1069        let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
1070
1071        Box::new(move || {
1072            trace!(name: "o2m_connect thunk", %serialized);
1073        })
1074    }
1075
1076    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1077    fn m2o_sink_source(
1078        _env: &mut Self::InstantiateEnv,
1079        c1: &Self::Cluster,
1080        c1_port: &<Self::Cluster as Node>::Port,
1081        p2: &Self::Process,
1082        p2_port: &<Self::Process as Node>::Port,
1083        name: Option<&str>,
1084        networking_info: &crate::networking::NetworkingInfo,
1085    ) -> (syn::Expr, syn::Expr) {
1086        match networking_info {
1087            crate::networking::NetworkingInfo::Tcp {
1088                fault: crate::networking::TcpFault::FailStop,
1089            } => {}
1090            _ => panic!("Unsupported networking info: {:?}", networking_info),
1091        }
1092
1093        deploy_containerized_m2o(
1094            &p2.name,
1095            name.expect("channel name is required for containerized deployment"),
1096        )
1097    }
1098
1099    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1100    fn m2o_connect(
1101        c1: &Self::Cluster,
1102        c1_port: &<Self::Cluster as Node>::Port,
1103        p2: &Self::Process,
1104        p2_port: &<Self::Process as Node>::Port,
1105    ) -> Box<dyn FnOnce()> {
1106        let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
1107
1108        Box::new(move || {
1109            trace!(name: "m2o_connect thunk", %serialized);
1110        })
1111    }
1112
1113    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1114    fn m2m_sink_source(
1115        _env: &mut Self::InstantiateEnv,
1116        c1: &Self::Cluster,
1117        c1_port: &<Self::Cluster as Node>::Port,
1118        c2: &Self::Cluster,
1119        c2_port: &<Self::Cluster as Node>::Port,
1120        name: Option<&str>,
1121        networking_info: &crate::networking::NetworkingInfo,
1122    ) -> (syn::Expr, syn::Expr) {
1123        match networking_info {
1124            crate::networking::NetworkingInfo::Tcp {
1125                fault: crate::networking::TcpFault::FailStop,
1126            } => {}
1127            _ => panic!("Unsupported networking info: {:?}", networking_info),
1128        }
1129
1130        deploy_containerized_m2m(
1131            name.expect("channel name is required for containerized deployment"),
1132        )
1133    }
1134
1135    #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1136    fn m2m_connect(
1137        c1: &Self::Cluster,
1138        c1_port: &<Self::Cluster as Node>::Port,
1139        c2: &Self::Cluster,
1140        c2_port: &<Self::Cluster as Node>::Port,
1141    ) -> Box<dyn FnOnce()> {
1142        let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1143
1144        Box::new(move || {
1145            trace!(name: "m2m_connect thunk", %serialized);
1146        })
1147    }
1148
1149    #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1150    fn e2o_many_source(
1151        extra_stmts: &mut Vec<syn::Stmt>,
1152        p2: &Self::Process,
1153        p2_port: &<Self::Process as Node>::Port,
1154        codec_type: &syn::Type,
1155        shared_handle: String,
1156    ) -> syn::Expr {
1157        p2.exposed_ports.borrow_mut().push(*p2_port);
1158
1159        let socket_ident = syn::Ident::new(
1160            &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1161            Span::call_site(),
1162        );
1163
1164        let source_ident = syn::Ident::new(
1165            &format!("__hydro_deploy_many_{}_source", &shared_handle),
1166            Span::call_site(),
1167        );
1168
1169        let sink_ident = syn::Ident::new(
1170            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1171            Span::call_site(),
1172        );
1173
1174        let membership_ident = syn::Ident::new(
1175            &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1176            Span::call_site(),
1177        );
1178
1179        let bind_addr = format!("0.0.0.0:{}", p2_port);
1180
1181        extra_stmts.push(syn::parse_quote! {
1182            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1183        });
1184
1185        let root = crate::staging_util::get_this_crate();
1186
1187        extra_stmts.push(syn::parse_quote! {
1188            let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1189        });
1190
1191        parse_quote!(#source_ident)
1192    }
1193
1194    #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1195    fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1196        let sink_ident = syn::Ident::new(
1197            &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1198            Span::call_site(),
1199        );
1200        parse_quote!(#sink_ident)
1201    }
1202
1203    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1204    fn e2o_source(
1205        extra_stmts: &mut Vec<syn::Stmt>,
1206        p1: &Self::External,
1207        p1_port: &<Self::External as Node>::Port,
1208        p2: &Self::Process,
1209        p2_port: &<Self::Process as Node>::Port,
1210        _codec_type: &syn::Type,
1211        shared_handle: String,
1212    ) -> syn::Expr {
1213        p1.connection_info.borrow_mut().insert(
1214            *p1_port,
1215            (
1216                p2.docker_container_name.clone(),
1217                *p2_port,
1218                p2.network.clone(),
1219            ),
1220        );
1221
1222        p2.exposed_ports.borrow_mut().push(*p2_port);
1223
1224        let socket_ident = syn::Ident::new(
1225            &format!("__hydro_deploy_{}_socket", &shared_handle),
1226            Span::call_site(),
1227        );
1228
1229        let source_ident = syn::Ident::new(
1230            &format!("__hydro_deploy_{}_source", &shared_handle),
1231            Span::call_site(),
1232        );
1233
1234        let sink_ident = syn::Ident::new(
1235            &format!("__hydro_deploy_{}_sink", &shared_handle),
1236            Span::call_site(),
1237        );
1238
1239        let bind_addr = format!("0.0.0.0:{}", p2_port);
1240
1241        extra_stmts.push(syn::parse_quote! {
1242            let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1243        });
1244
1245        let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1246
1247        extra_stmts.push(syn::parse_quote! {
1248            let (#sink_ident, #source_ident) = (#create_expr).split();
1249        });
1250
1251        parse_quote!(#source_ident)
1252    }
1253
1254    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1255    fn e2o_connect(
1256        p1: &Self::External,
1257        p1_port: &<Self::External as Node>::Port,
1258        p2: &Self::Process,
1259        p2_port: &<Self::Process as Node>::Port,
1260        many: bool,
1261        server_hint: NetworkHint,
1262    ) -> Box<dyn FnOnce()> {
1263        if server_hint != NetworkHint::Auto {
1264            panic!(
1265                "Docker deployment only supports NetworkHint::Auto, got {:?}",
1266                server_hint
1267            );
1268        }
1269
1270        // For many connections, we need to populate connection_info so as_bincode_bidi can find it
1271        if many {
1272            p1.connection_info.borrow_mut().insert(
1273                *p1_port,
1274                (
1275                    p2.docker_container_name.clone(),
1276                    *p2_port,
1277                    p2.network.clone(),
1278                ),
1279            );
1280        }
1281
1282        let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1283
1284        Box::new(move || {
1285            trace!(name: "e2o_connect thunk", %serialized);
1286        })
1287    }
1288
1289    #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1290    fn o2e_sink(
1291        p1: &Self::Process,
1292        p1_port: &<Self::Process as Node>::Port,
1293        p2: &Self::External,
1294        p2_port: &<Self::External as Node>::Port,
1295        shared_handle: String,
1296    ) -> syn::Expr {
1297        let sink_ident = syn::Ident::new(
1298            &format!("__hydro_deploy_{}_sink", &shared_handle),
1299            Span::call_site(),
1300        );
1301        parse_quote!(#sink_ident)
1302    }
1303
1304    #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1305    fn cluster_ids(
1306        of_cluster: LocationKey,
1307    ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1308        cluster_ids()
1309    }
1310
1311    #[instrument(level = "trace", skip_all)]
1312    fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1313        cluster_self_id()
1314    }
1315
1316    #[instrument(level = "trace", skip_all, fields(?location_id))]
1317    fn cluster_membership_stream(
1318        _env: &mut Self::InstantiateEnv,
1319        _at_location: &LocationId,
1320        location_id: &LocationId,
1321    ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1322    {
1323        cluster_membership_stream(location_id)
1324    }
1325}
1326
1327const CONTAINER_ALPHABET: [char; 36] = [
1328    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1329    'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1330];
1331
1332fn is_valid_docker_image_name(name: &str) -> bool {
1333    regex::Regex::new(r"^[a-z0-9]+([._-][a-z0-9]+)*$")
1334        .unwrap()
1335        .is_match(name)
1336}
1337
1338#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1339fn get_docker_image_name(
1340    name_hint: &str,
1341    location_key: LocationKey,
1342    deployment_instance: &str,
1343) -> String {
1344    let name_hint: String = name_hint
1345        .split("::")
1346        .last()
1347        .unwrap()
1348        .to_ascii_lowercase()
1349        .split(['.', '_', '-'])
1350        .filter(|s| !s.is_empty())
1351        .collect::<Vec<_>>()
1352        .join("-");
1353
1354    let image_name = format!("hy-{name_hint}-{deployment_instance}-{location_key}");
1355
1356    if !is_valid_docker_image_name(&image_name) {
1357        panic!(
1358            "Generated Docker image name '{image_name}' is not a valid Docker image name. \
1359             Docker image names may only contain lowercase alphanumeric characters \
1360             separated by single '.', '_', or '-' characters, and must start and end \
1361             with an alphanumeric character. The most likely cause is your location \
1362             struct name '{name_hint}'"
1363        );
1364    }
1365
1366    image_name
1367}
1368
1369#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1370fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1371    if let Some(instance) = instance {
1372        format!("{image_name}-{instance}")
1373    } else {
1374        image_name.to_owned()
1375    }
1376}
1377/// Represents a Process running in a docker container
1378#[derive(Clone)]
1379pub struct DockerDeployProcessSpec {
1380    compilation_options: Option<String>,
1381    config: Vec<String>,
1382    network: DockerNetwork,
1383    deployment_instance: String,
1384    base_image: Option<String>,
1385    linux_compile_type: LinuxCompileType,
1386    features: Vec<String>,
1387}
1388
1389impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1390    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1391    fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1392        DockerDeployProcess {
1393            key,
1394            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1395
1396            next_port: Rc::new(RefCell::new(1000)),
1397            rust_crate: Rc::new(RefCell::new(None)),
1398
1399            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1400
1401            docker_container_name: Rc::new(RefCell::new(None)),
1402
1403            compilation_options: self.compilation_options,
1404            config: self.config,
1405
1406            network: self.network.clone(),
1407
1408            base_image: self.base_image,
1409            linux_compile_type: self.linux_compile_type,
1410            features: self.features,
1411        }
1412    }
1413}
1414
1415/// Represents a Cluster running across `count` docker containers.
1416#[derive(Clone)]
1417pub struct DockerDeployClusterSpec {
1418    compilation_options: Option<String>,
1419    config: Vec<String>,
1420    count: usize,
1421    deployment_instance: String,
1422    base_image: Option<String>,
1423    linux_compile_type: LinuxCompileType,
1424    features: Vec<String>,
1425}
1426
1427impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1428    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1429    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1430        DockerDeployCluster {
1431            key,
1432            name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1433
1434            next_port: Rc::new(RefCell::new(1000)),
1435            rust_crate: Rc::new(RefCell::new(None)),
1436
1437            exposed_ports: Rc::new(RefCell::new(Vec::new())),
1438
1439            docker_container_name: Rc::new(RefCell::new(Vec::new())),
1440
1441            compilation_options: self.compilation_options,
1442            config: self.config,
1443
1444            count: self.count,
1445
1446            base_image: self.base_image,
1447            linux_compile_type: self.linux_compile_type,
1448            features: self.features,
1449        }
1450    }
1451}
1452
1453impl DockerDeployProcessSpec {
1454    /// Set the base Docker image for this process.
1455    /// Defaults to `scratch` if not specified.
1456    pub fn base_image(mut self, image: impl Into<String>) -> Self {
1457        self.base_image = Some(image.into());
1458        self
1459    }
1460
1461    /// Set the Linux compile type (glibc or musl) for this process.
1462    /// Defaults to `Musl` if not specified.
1463    pub fn linux_compile_type(mut self, compile_type: LinuxCompileType) -> Self {
1464        self.linux_compile_type = compile_type;
1465        self
1466    }
1467
1468    /// Add features to enable when compiling the final binary.
1469    pub fn features(mut self, features: impl IntoIterator<Item = impl Into<String>>) -> Self {
1470        self.features.extend(features.into_iter().map(Into::into));
1471        self
1472    }
1473
1474    /// Add a single feature to enable when compiling the final binary.
1475    pub fn feature(mut self, feature: impl Into<String>) -> Self {
1476        self.features.push(feature.into());
1477        self
1478    }
1479}
1480
1481impl DockerDeployClusterSpec {
1482    /// Set the base Docker image for this cluster.
1483    /// Defaults to `scratch` if not specified.
1484    pub fn base_image(mut self, image: impl Into<String>) -> Self {
1485        self.base_image = Some(image.into());
1486        self
1487    }
1488
1489    /// Set the Linux compile type (glibc or musl) for this cluster.
1490    /// Defaults to `Musl` if not specified.
1491    pub fn linux_compile_type(mut self, compile_type: LinuxCompileType) -> Self {
1492        self.linux_compile_type = compile_type;
1493        self
1494    }
1495
1496    /// Add features to enable when compiling the final binary.
1497    pub fn features(mut self, features: impl IntoIterator<Item = impl Into<String>>) -> Self {
1498        self.features.extend(features.into_iter().map(Into::into));
1499        self
1500    }
1501
1502    /// Add a single feature to enable when compiling the final binary.
1503    pub fn feature(mut self, feature: impl Into<String>) -> Self {
1504        self.features.push(feature.into());
1505        self
1506    }
1507}
1508
1509/// Represents an external process outside of the management of hydro deploy.
1510pub struct DockerDeployExternalSpec {
1511    name: String,
1512}
1513
1514impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1515    #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1516    fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1517        DockerDeployExternal {
1518            key,
1519            name: self.name,
1520            next_port: Rc::new(RefCell::new(10000)),
1521            next_external_port_id: Rc::new(RefCell::new(crate::Counter::default())),
1522            ports: Rc::new(RefCell::new(HashMap::new())),
1523            connection_info: Rc::new(RefCell::new(HashMap::new())),
1524        }
1525    }
1526}