1use std::cell::RefCell;
4use std::collections::HashMap;
5use std::pin::Pin;
6use std::rc::Rc;
7
8use bollard::Docker;
9use bollard::models::{ContainerCreateBody, EndpointSettings, HostConfig, NetworkCreateRequest};
10use bollard::query_parameters::{
11 BuildImageOptions, CreateContainerOptions, InspectContainerOptions, KillContainerOptions,
12 RemoveContainerOptions, StartContainerOptions,
13};
14use bollard::secret::NetworkingConfig;
15use bytes::Bytes;
16use dfir_lang::graph::DfirGraph;
17use futures::{Sink, SinkExt, Stream, StreamExt};
18use http_body_util::Full;
19pub use hydro_deploy::LinuxCompileType;
21use hydro_deploy::RustCrate;
22use hydro_deploy::rust_crate::build::{BuildError, build_crate_memoized};
23use nanoid::nanoid;
24use proc_macro2::Span;
25use sinktools::lazy::LazySink;
26use stageleft::QuotedWithContext;
27use syn::parse_quote;
28use tar::{Builder, Header};
29use tokio::net::TcpStream;
30use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
31use tracing::{Instrument, instrument, trace, warn};
32
33use super::deploy_runtime_containerized::*;
34use crate::compile::builder::ExternalPortId;
35use crate::compile::deploy::DeployResult;
36use crate::compile::deploy_provider::{
37 ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort,
38};
39use crate::compile::trybuild::generate::{LinkingMode, create_graph_trybuild};
40use crate::location::dynamic::LocationId;
41use crate::location::member_id::TaglessMemberId;
42use crate::location::{LocationKey, MembershipEvent, NetworkHint};
43
44#[derive(Clone, Debug)]
46pub struct DockerNetwork {
47 name: String,
48}
49
50impl DockerNetwork {
51 pub fn new(name: String) -> Self {
53 Self {
54 name: format!("{name}-{}", nanoid::nanoid!(6, &CONTAINER_ALPHABET)),
55 }
56 }
57}
58
59#[derive(Clone)]
61pub struct DockerDeployProcess {
62 key: LocationKey,
63 name: String,
64 next_port: Rc<RefCell<u16>>,
65 rust_crate: Rc<RefCell<Option<RustCrate>>>,
66
67 exposed_ports: Rc<RefCell<Vec<u16>>>,
68
69 docker_container_name: Rc<RefCell<Option<String>>>,
70
71 compilation_options: Option<String>,
72
73 config: Vec<String>,
74
75 network: DockerNetwork,
76
77 base_image: Option<String>,
78
79 linux_compile_type: LinuxCompileType,
80
81 features: Vec<String>,
82}
83
84impl Node for DockerDeployProcess {
85 type Port = u16;
86 type Meta = ();
87 type InstantiateEnv = DockerDeploy;
88
89 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
90 fn next_port(&self) -> Self::Port {
91 let port = {
92 let mut borrow = self.next_port.borrow_mut();
93 let port = *borrow;
94 *borrow += 1;
95 port
96 };
97
98 port
99 }
100
101 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
102 fn update_meta(&self, _meta: &Self::Meta) {}
103
104 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
105 fn instantiate(
106 &self,
107 _env: &mut Self::InstantiateEnv,
108 meta: &mut Self::Meta,
109 graph: DfirGraph,
110 extra_stmts: &[syn::Stmt],
111 sidecars: &[syn::Expr],
112 ) {
113 let (bin_name, config) = create_graph_trybuild(
114 graph,
115 extra_stmts,
116 sidecars,
117 Some(&self.name),
118 crate::compile::trybuild::generate::DeployMode::Containerized,
119 LinkingMode::Static,
120 );
121
122 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
123 .target_dir(config.target_dir)
124 .example(bin_name)
125 .no_default_features();
126
127 ret = ret.display_name("test_display_name");
128
129 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
130
131 if let Some(features) = config.features {
132 ret = ret.features(features);
133 }
134
135 if !self.features.is_empty() {
136 ret = ret.features(self.features.clone());
137 }
138
139 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
140 ret = ret.config("build.incremental = false");
141
142 *self.rust_crate.borrow_mut() = Some(ret);
143 }
144}
145
146#[derive(Clone)]
148pub struct DockerDeployCluster {
149 key: LocationKey,
150 name: String,
151 next_port: Rc<RefCell<u16>>,
152 rust_crate: Rc<RefCell<Option<RustCrate>>>,
153
154 exposed_ports: Rc<RefCell<Vec<u16>>>,
155
156 docker_container_name: Rc<RefCell<Vec<String>>>,
157
158 compilation_options: Option<String>,
159
160 config: Vec<String>,
161
162 count: usize,
163
164 base_image: Option<String>,
165
166 linux_compile_type: LinuxCompileType,
167
168 features: Vec<String>,
169}
170
171impl Node for DockerDeployCluster {
172 type Port = u16;
173 type Meta = ();
174 type InstantiateEnv = DockerDeploy;
175
176 #[instrument(level = "trace", skip_all, ret, fields(key = %self.key, name = self.name))]
177 fn next_port(&self) -> Self::Port {
178 let port = {
179 let mut borrow = self.next_port.borrow_mut();
180 let port = *borrow;
181 *borrow += 1;
182 port
183 };
184
185 port
186 }
187
188 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name))]
189 fn update_meta(&self, _meta: &Self::Meta) {}
190
191 #[instrument(level = "trace", skip_all, fields(key = %self.key, name = self.name, extra_stmts = extra_stmts.len()))]
192 fn instantiate(
193 &self,
194 _env: &mut Self::InstantiateEnv,
195 _meta: &mut Self::Meta,
196 graph: DfirGraph,
197 extra_stmts: &[syn::Stmt],
198 sidecars: &[syn::Expr],
199 ) {
200 let (bin_name, config) = create_graph_trybuild(
201 graph,
202 extra_stmts,
203 sidecars,
204 Some(&self.name),
205 crate::compile::trybuild::generate::DeployMode::Containerized,
206 LinkingMode::Static,
207 );
208
209 let mut ret = RustCrate::new(&config.project_dir, &config.project_dir)
210 .target_dir(config.target_dir)
211 .example(bin_name)
212 .no_default_features();
213
214 ret = ret.display_name("test_display_name");
215
216 ret = ret.features(vec!["hydro___feature_docker_runtime".to_owned()]);
217
218 if let Some(features) = config.features {
219 ret = ret.features(features);
220 }
221
222 if !self.features.is_empty() {
223 ret = ret.features(self.features.clone());
224 }
225
226 ret = ret.build_env("STAGELEFT_TRYBUILD_BUILD_STAGED", "1");
227 ret = ret.config("build.incremental = false");
228
229 *self.rust_crate.borrow_mut() = Some(ret);
230 }
231}
232
233#[derive(Clone, Debug)]
235#[expect(
236 dead_code,
237 reason = "fields used via Rc<RefCell> in RegisterPort impl and ExternalBytesPort construction"
238)]
239pub struct DockerDeployExternal {
240 pub(crate) key: LocationKey,
242 name: String,
243 next_port: Rc<RefCell<u16>>,
244
245 next_external_port_id: Rc<RefCell<crate::Counter<ExternalPortId>>>,
247
248 ports: Rc<RefCell<HashMap<ExternalPortId, u16>>>,
249
250 connection_info: Rc<RefCell<HashMap<u16, (Rc<RefCell<Option<String>>>, u16, DockerNetwork)>>>,
251}
252
253impl Node for DockerDeployExternal {
254 type Port = u16;
255 type Meta = ();
256 type InstantiateEnv = DockerDeploy;
257
258 #[instrument(level = "trace", skip_all, ret, fields(name = self.name))]
259 fn next_port(&self) -> Self::Port {
260 let port = {
261 let mut borrow = self.next_port.borrow_mut();
262 let port = *borrow;
263 *borrow += 1;
264 port
265 };
266
267 port
268 }
269
270 #[instrument(level = "trace", skip_all, fields(name = self.name))]
271 fn update_meta(&self, _meta: &Self::Meta) {}
272
273 #[instrument(level = "trace", skip_all, fields(name = self.name, ?meta, extra_stmts = extra_stmts.len(), sidecars = sidecars.len()))]
274 fn instantiate(
275 &self,
276 _env: &mut Self::InstantiateEnv,
277 meta: &mut Self::Meta,
278 graph: DfirGraph,
279 extra_stmts: &[syn::Stmt],
280 sidecars: &[syn::Expr],
281 ) {
282 trace!(name: "surface", surface = graph.surface_syntax_string());
283 }
284}
285
286impl DockerDeployProcess {
287 pub fn expose_port(&self, port: u16) {
293 self.exposed_ports.borrow_mut().push(port);
294 }
295
296 pub async fn get_tcp_endpoint(&self, container_port: u16) -> (String, u16) {
300 let name = self
301 .docker_container_name
302 .borrow()
303 .as_ref()
304 .expect("container not yet started")
305 .clone();
306 let host_port = find_dynamically_allocated_docker_port(&name, container_port).await;
307 ("localhost".to_owned(), host_port)
308 }
309}
310
311impl DockerDeployCluster {
312 pub fn expose_port(&self, port: u16) {
318 self.exposed_ports.borrow_mut().push(port);
319 }
320
321 pub async fn get_all_tcp_endpoints(&self, container_port: u16) -> Vec<(String, u16)> {
325 let names = self.docker_container_name.borrow().clone();
326 let mut endpoints = Vec::with_capacity(names.len());
327 for name in names {
328 let host_port = find_dynamically_allocated_docker_port(&name, container_port).await;
329 endpoints.push(("localhost".to_owned(), host_port));
330 }
331 endpoints
332 }
333}
334
335type DynSourceSink<Out, In, InErr> = (
336 Pin<Box<dyn Stream<Item = Out>>>,
337 Pin<Box<dyn Sink<In, Error = InErr>>>,
338);
339
340impl<'a> RegisterPort<'a, DockerDeploy> for DockerDeployExternal {
341 #[instrument(level = "trace", skip_all, fields(name = self.name, %external_port_id, %port))]
342 fn register(&self, external_port_id: ExternalPortId, port: Self::Port) {
343 self.ports.borrow_mut().insert(external_port_id, port);
344 }
345
346 fn as_bytes_bidi(
347 &self,
348 external_port_id: ExternalPortId,
349 ) -> impl Future<
350 Output = DynSourceSink<Result<bytes::BytesMut, std::io::Error>, Bytes, std::io::Error>,
351 > + 'a {
352 let guard =
353 tracing::trace_span!("as_bytes_bidi", name = %self.name, %external_port_id).entered();
354
355 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
356 let (docker_container_name, remote_port, _) = self
357 .connection_info
358 .borrow()
359 .get(&local_port)
360 .unwrap()
361 .clone();
362
363 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
364
365 async move {
366 let local_port =
367 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
368 let remote_ip_address = "localhost";
369
370 trace!(name: "as_bytes_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
371
372 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
373 .await
374 .unwrap();
375
376 trace!(name: "as_bytes_bidi_connected", to = %remote_ip_address, to_port = %local_port);
377
378 let (rx, tx) = stream.into_split();
379
380 let source = Box::pin(
381 FramedRead::new(rx, LengthDelimitedCodec::new()),
382 ) as Pin<Box<dyn Stream<Item = Result<bytes::BytesMut, std::io::Error>>>>;
383
384 let sink = Box::pin(FramedWrite::new(tx, LengthDelimitedCodec::new()))
385 as Pin<Box<dyn Sink<Bytes, Error = std::io::Error>>>;
386
387 (source, sink)
388 }
389 .instrument(guard.exit())
390 }
391
392 fn as_bincode_bidi<InT, OutT>(
393 &self,
394 external_port_id: ExternalPortId,
395 ) -> impl Future<Output = DynSourceSink<OutT, InT, std::io::Error>> + 'a
396 where
397 InT: serde::Serialize + 'static,
398 OutT: serde::de::DeserializeOwned + 'static,
399 {
400 let guard =
401 tracing::trace_span!("as_bincode_bidi", name = %self.name, %external_port_id).entered();
402
403 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
404 let (docker_container_name, remote_port, _) = self
405 .connection_info
406 .borrow()
407 .get(&local_port)
408 .unwrap()
409 .clone();
410
411 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
412
413 async move {
414 let local_port =
415 find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
416 let remote_ip_address = "localhost";
417
418 trace!(name: "as_bincode_bidi_connecting", to = %remote_ip_address, to_port = %local_port);
419
420 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
421 .await
422 .unwrap();
423
424 trace!(name: "as_bincode_bidi_connected", to = %remote_ip_address, to_port = %local_port);
425
426 let (rx, tx) = stream.into_split();
427
428 let source = Box::pin(
429 FramedRead::new(rx, LengthDelimitedCodec::new())
430 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
431 ) as Pin<Box<dyn Stream<Item = OutT>>>;
432
433 let sink = Box::pin(
434 FramedWrite::new(tx, LengthDelimitedCodec::new()).with(move |v: InT| async move {
435 Ok::<_, std::io::Error>(Bytes::from(bincode::serialize(&v).unwrap()))
436 }),
437 ) as Pin<Box<dyn Sink<InT, Error = std::io::Error>>>;
438
439 (source, sink)
440 }
441 .instrument(guard.exit())
442 }
443
444 fn as_bincode_sink<T>(
445 &self,
446 external_port_id: ExternalPortId,
447 ) -> impl Future<Output = Pin<Box<dyn Sink<T, Error = std::io::Error>>>> + 'a
448 where
449 T: serde::Serialize + 'static,
450 {
451 let guard =
452 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
453
454 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
455 let (docker_container_name, remote_port, _) = self
456 .connection_info
457 .borrow()
458 .get(&local_port)
459 .unwrap()
460 .clone();
461
462 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
463
464 async move {
465 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
466 let remote_ip_address = "localhost";
467
468 Box::pin(
469 LazySink::new(move || {
470 Box::pin(async move {
471 trace!(name: "as_bincode_sink_connecting", to = %remote_ip_address, to_port = %local_port);
472
473 let stream =
474 TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
475 .await?;
476
477 trace!(name: "as_bincode_sink_connected", to = %remote_ip_address, to_port = %local_port);
478
479 Result::<_, std::io::Error>::Ok(FramedWrite::new(
480 stream,
481 LengthDelimitedCodec::new(),
482 ))
483 })
484 })
485 .with(move |v| async move {
486 Ok(Bytes::from(bincode::serialize(&v).unwrap()))
487 }),
488 ) as Pin<Box<dyn Sink<T, Error = std::io::Error>>>
489 }
490 .instrument(guard.exit())
491 }
492
493 fn as_bincode_source<T>(
494 &self,
495 external_port_id: ExternalPortId,
496 ) -> impl Future<Output = Pin<Box<dyn Stream<Item = T>>>> + 'a
497 where
498 T: serde::de::DeserializeOwned + 'static,
499 {
500 let guard =
501 tracing::trace_span!("as_bincode_sink", name = %self.name, %external_port_id).entered();
502
503 let local_port = *self.ports.borrow().get(&external_port_id).unwrap();
504 let (docker_container_name, remote_port, _) = self
505 .connection_info
506 .borrow()
507 .get(&local_port)
508 .unwrap()
509 .clone();
510
511 let docker_container_name = docker_container_name.borrow().as_ref().unwrap().clone();
512
513 async move {
514
515 let local_port = find_dynamically_allocated_docker_port(&docker_container_name, remote_port).await;
516 let remote_ip_address = "localhost";
517
518 trace!(name: "as_bincode_source_connecting", to = %remote_ip_address, to_port = %local_port);
519
520 let stream = TcpStream::connect(format!("{remote_ip_address}:{local_port}"))
521 .await
522 .unwrap();
523
524 trace!(name: "as_bincode_source_connected", to = %remote_ip_address, to_port = %local_port);
525
526 Box::pin(
527 FramedRead::new(stream, LengthDelimitedCodec::new())
528 .map(|v| bincode::deserialize(&v.unwrap()).unwrap()),
529 ) as Pin<Box<dyn Stream<Item = T>>>
530 }
531 .instrument(guard.exit())
532 }
533}
534
535#[instrument(level = "trace", skip_all, fields(%docker_container_name, %destination_port))]
536async fn find_dynamically_allocated_docker_port(
537 docker_container_name: &str,
538 destination_port: u16,
539) -> u16 {
540 let docker = Docker::connect_with_local_defaults().unwrap();
541
542 let container_info = docker
543 .inspect_container(docker_container_name, None::<InspectContainerOptions>)
544 .await
545 .unwrap();
546
547 trace!(name: "port struct", container_info = ?container_info.network_settings.as_ref().unwrap().ports.as_ref().unwrap());
548
549 let remote_port = container_info
551 .network_settings
552 .as_ref()
553 .unwrap()
554 .ports
555 .as_ref()
556 .unwrap()
557 .get(&format!("{destination_port}/tcp"))
558 .unwrap()
559 .as_ref()
560 .unwrap()
561 .iter()
562 .find(|v| v.host_ip == Some("0.0.0.0".to_owned()))
563 .unwrap()
564 .host_port
565 .as_ref()
566 .unwrap()
567 .parse()
568 .unwrap();
569
570 remote_port
571}
572
573pub struct DockerDeploy {
575 docker_processes: Vec<DockerDeployProcessSpec>,
576 docker_clusters: Vec<DockerDeployClusterSpec>,
577 network: DockerNetwork,
578 deployment_instance: String,
579}
580
581#[instrument(level = "trace", skip_all, fields(%image_name, %container_name, %network_name, %deployment_instance))]
582async fn create_and_start_container(
583 docker: &Docker,
584 container_name: &str,
585 image_name: &str,
586 network_name: &str,
587 deployment_instance: &str,
588) -> Result<(), anyhow::Error> {
589 let config = ContainerCreateBody {
590 image: Some(image_name.to_owned()),
591 hostname: Some(container_name.to_owned()),
592 host_config: Some(HostConfig {
593 binds: Some(vec!["/var/run/docker.sock:/var/run/docker.sock".to_owned()]),
594 publish_all_ports: Some(true),
595 port_bindings: Some(HashMap::new()), ..Default::default()
597 }),
598 env: Some(vec![
599 format!("CONTAINER_NAME={container_name}"),
600 format!("DEPLOYMENT_INSTANCE={deployment_instance}"),
601 format!("RUST_LOG=trace"),
602 ]),
603 networking_config: Some(NetworkingConfig {
604 endpoints_config: Some(HashMap::from([(
605 network_name.to_owned(),
606 EndpointSettings {
607 ..Default::default()
608 },
609 )])),
610 }),
611 tty: Some(true),
612 ..Default::default()
613 };
614
615 let options = CreateContainerOptions {
616 name: Some(container_name.to_owned()),
617 ..Default::default()
618 };
619
620 tracing::error!("Config: {}", serde_json::to_string_pretty(&config).unwrap());
621 docker.create_container(Some(options), config).await?;
622 docker
623 .start_container(container_name, None::<StartContainerOptions>)
624 .await?;
625
626 Ok(())
627}
628
629#[instrument(level = "trace", skip_all, fields(%image_name))]
630async fn build_and_create_image(
631 rust_crate: &Rc<RefCell<Option<RustCrate>>>,
632 compilation_options: Option<&str>,
633 config: &[String],
634 exposed_ports: &[u16],
635 image_name: &str,
636 base_image: Option<&str>,
637 linux_compile_type: LinuxCompileType,
638) -> Result<(), anyhow::Error> {
639 let mut rust_crate = rust_crate
640 .borrow_mut()
641 .take()
642 .unwrap()
643 .rustflags(compilation_options.unwrap_or_default());
644
645 for cfg in config {
646 rust_crate = rust_crate.config(cfg);
647 }
648
649 let build_output = match build_crate_memoized(
650 rust_crate.get_build_params(hydro_deploy::HostTargetType::Linux(linux_compile_type)),
651 )
652 .await
653 {
654 Ok(build_output) => build_output,
655 Err(BuildError::FailedToBuildCrate {
656 exit_status,
657 diagnostics,
658 text_lines,
659 stderr_lines,
660 }) => {
661 let diagnostics = diagnostics
662 .into_iter()
663 .map(|d| d.rendered.unwrap())
664 .collect::<Vec<_>>()
665 .join("\n");
666 let text_lines = text_lines.join("\n");
667 let stderr_lines = stderr_lines.join("\n");
668
669 anyhow::bail!(
670 r#"
671Failed to build crate {exit_status:?}
672--- diagnostics
673---
674{diagnostics}
675---
676---
677---
678
679--- text_lines
680---
681---
682{text_lines}
683---
684---
685---
686
687--- stderr_lines
688---
689---
690{stderr_lines}
691---
692---
693---"#
694 );
695 }
696 Err(err) => {
697 anyhow::bail!("Failed to build crate {err:?}");
698 }
699 };
700
701 let docker = Docker::connect_with_local_defaults()?;
702
703 let mut tar_data = Vec::new();
704 {
705 let mut tar = Builder::new(&mut tar_data);
706
707 let exposed_ports = exposed_ports
708 .iter()
709 .map(|port| format!("EXPOSE {port}/tcp"))
710 .collect::<Vec<_>>()
711 .join("\n");
712
713 let from_image = base_image.unwrap_or("scratch");
714 let dockerfile_content = format!(
715 r#"
716 FROM {from_image}
717 {exposed_ports}
718 COPY app /app
719 CMD ["/app"]
720 "#,
721 );
722
723 trace!(name: "dockerfile", %dockerfile_content);
724
725 let mut header = Header::new_gnu();
726 header.set_path("Dockerfile")?;
727 header.set_size(dockerfile_content.len() as u64);
728 header.set_cksum();
729 tar.append(&header, dockerfile_content.as_bytes())?;
730
731 let mut header = Header::new_gnu();
732 header.set_path("app")?;
733 header.set_size(build_output.bin_data.len() as u64);
734 header.set_mode(0o755);
735 header.set_cksum();
736 tar.append(&header, &build_output.bin_data[..])?;
737
738 tar.finish()?;
739 }
740
741 let build_options = BuildImageOptions {
742 dockerfile: "Dockerfile".to_owned(),
743 t: Some(image_name.to_owned()),
744 rm: true,
745 ..Default::default()
746 };
747
748 use bollard::errors::Error;
749
750 let body = http_body_util::Either::Left(Full::new(Bytes::from(tar_data)));
751 let mut build_stream = docker.build_image(build_options, None, Some(body));
752 while let Some(msg) = build_stream.next().await {
753 match msg {
754 Ok(_) => {}
755 Err(e) => match e {
756 Error::DockerStreamError { error } => {
757 return Err(anyhow::anyhow!(
758 "Docker build failed: DockerStreamError: {{ error: {error} }}"
759 ));
760 }
761 _ => return Err(anyhow::anyhow!("Docker build failed: {}", e)),
762 },
763 }
764 }
765
766 Ok(())
767}
768
769impl DockerDeploy {
770 pub fn new(network: DockerNetwork) -> Self {
772 Self {
773 docker_processes: Vec::new(),
774 docker_clusters: Vec::new(),
775 network,
776 deployment_instance: nanoid!(6, &CONTAINER_ALPHABET),
777 }
778 }
779
780 pub fn add_localhost_docker(
782 &mut self,
783 compilation_options: Option<String>,
784 config: Vec<String>,
785 ) -> DockerDeployProcessSpec {
786 let process = DockerDeployProcessSpec {
787 compilation_options,
788 config,
789 network: self.network.clone(),
790 deployment_instance: self.deployment_instance.clone(),
791 base_image: None,
792 linux_compile_type: LinuxCompileType::Musl,
793 features: vec![],
794 };
795
796 self.docker_processes.push(process.clone());
797
798 process
799 }
800
801 pub fn add_localhost_docker_cluster(
803 &mut self,
804 compilation_options: Option<String>,
805 config: Vec<String>,
806 count: usize,
807 ) -> DockerDeployClusterSpec {
808 let cluster = DockerDeployClusterSpec {
809 compilation_options,
810 config,
811 count,
812 deployment_instance: self.deployment_instance.clone(),
813 base_image: None,
814 linux_compile_type: LinuxCompileType::Musl,
815 features: vec![],
816 };
817
818 self.docker_clusters.push(cluster.clone());
819
820 cluster
821 }
822
823 pub fn add_external(&self, name: String) -> DockerDeployExternalSpec {
825 DockerDeployExternalSpec { name }
826 }
827
828 pub fn get_deployment_instance(&self) -> String {
830 self.deployment_instance.clone()
831 }
832
833 #[instrument(level = "trace", skip_all)]
835 pub async fn provision(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
836 for (_, _, process) in nodes.get_all_processes() {
837 let exposed_ports = process.exposed_ports.borrow().clone();
838
839 build_and_create_image(
840 &process.rust_crate,
841 process.compilation_options.as_deref(),
842 &process.config,
843 &exposed_ports,
844 &process.name,
845 process.base_image.as_deref(),
846 process.linux_compile_type,
847 )
848 .await?;
849 }
850
851 for (_, _, cluster) in nodes.get_all_clusters() {
852 let exposed_ports = cluster.exposed_ports.borrow().clone();
853 build_and_create_image(
854 &cluster.rust_crate,
855 cluster.compilation_options.as_deref(),
856 &cluster.config,
857 &exposed_ports,
858 &cluster.name,
859 cluster.base_image.as_deref(),
860 cluster.linux_compile_type,
861 )
862 .await?;
863 }
864
865 Ok(())
866 }
867
868 #[instrument(level = "trace", skip_all)]
870 pub async fn start(&self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
871 let docker = Docker::connect_with_local_defaults()?;
872
873 match docker
874 .create_network(NetworkCreateRequest {
875 name: self.network.name.clone(),
876 driver: Some("bridge".to_owned()),
877 ..Default::default()
878 })
879 .await
880 {
881 Ok(v) => v.id,
882 Err(e) => {
883 panic!("Failed to create docker network: {e:?}");
884 }
885 };
886
887 for (_, _, process) in nodes.get_all_processes() {
888 let docker_container_name: String = get_docker_container_name(&process.name, None);
889 *process.docker_container_name.borrow_mut() = Some(docker_container_name.clone());
890
891 create_and_start_container(
892 &docker,
893 &docker_container_name,
894 &process.name,
895 &self.network.name,
896 &self.deployment_instance,
897 )
898 .await?;
899 }
900
901 for (_, _, cluster) in nodes.get_all_clusters() {
902 for num in 0..cluster.count {
903 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
904 cluster
905 .docker_container_name
906 .borrow_mut()
907 .push(docker_container_name.clone());
908
909 create_and_start_container(
910 &docker,
911 &docker_container_name,
912 &cluster.name,
913 &self.network.name,
914 &self.deployment_instance,
915 )
916 .await?;
917 }
918 }
919
920 Ok(())
921 }
922
923 #[instrument(level = "trace", skip_all)]
925 pub async fn stop(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
926 let docker = Docker::connect_with_local_defaults()?;
927
928 for (_, _, process) in nodes.get_all_processes() {
929 let docker_container_name: String = get_docker_container_name(&process.name, None);
930
931 docker
932 .kill_container(&docker_container_name, None::<KillContainerOptions>)
933 .await?;
934 }
935
936 for (_, _, cluster) in nodes.get_all_clusters() {
937 for num in 0..cluster.count {
938 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
939
940 docker
941 .kill_container(&docker_container_name, None::<KillContainerOptions>)
942 .await?;
943 }
944 }
945
946 Ok(())
947 }
948
949 #[instrument(level = "trace", skip_all)]
951 pub async fn cleanup(&mut self, nodes: &DeployResult<'_, Self>) -> Result<(), anyhow::Error> {
952 let docker = Docker::connect_with_local_defaults()?;
953
954 for (_, _, process) in nodes.get_all_processes() {
955 let docker_container_name: String = get_docker_container_name(&process.name, None);
956
957 docker
958 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
959 .await?;
960 }
961
962 for (_, _, cluster) in nodes.get_all_clusters() {
963 for num in 0..cluster.count {
964 let docker_container_name = get_docker_container_name(&cluster.name, Some(num));
965
966 docker
967 .remove_container(&docker_container_name, None::<RemoveContainerOptions>)
968 .await?;
969 }
970 }
971
972 docker
973 .remove_network(&self.network.name)
974 .await
975 .map_err(|e| anyhow::anyhow!("Failed to remove docker network: {e:?}"))?;
976
977 use bollard::query_parameters::RemoveImageOptions;
978
979 for (_, _, process) in nodes.get_all_processes() {
980 docker
981 .remove_image(&process.name, None::<RemoveImageOptions>, None)
982 .await?;
983 }
984
985 for (_, _, cluster) in nodes.get_all_clusters() {
986 docker
987 .remove_image(&cluster.name, None::<RemoveImageOptions>, None)
988 .await?;
989 }
990
991 Ok(())
992 }
993}
994
995impl<'a> Deploy<'a> for DockerDeploy {
996 type Meta = ();
997 type InstantiateEnv = Self;
998
999 type Process = DockerDeployProcess;
1000 type Cluster = DockerDeployCluster;
1001 type External = DockerDeployExternal;
1002
1003 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
1004 fn o2o_sink_source(
1005 _env: &mut Self::InstantiateEnv,
1006 p1: &Self::Process,
1007 p1_port: &<Self::Process as Node>::Port,
1008 p2: &Self::Process,
1009 p2_port: &<Self::Process as Node>::Port,
1010 name: Option<&str>,
1011 networking_info: &crate::networking::NetworkingInfo,
1012 ) -> (syn::Expr, syn::Expr) {
1013 match networking_info {
1014 crate::networking::NetworkingInfo::Tcp {
1015 fault: crate::networking::TcpFault::FailStop,
1016 } => {}
1017 _ => panic!("Unsupported networking info: {:?}", networking_info),
1018 }
1019
1020 deploy_containerized_o2o(
1021 &p2.name,
1022 name.expect("channel name is required for containerized deployment"),
1023 )
1024 }
1025
1026 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port))]
1027 fn o2o_connect(
1028 p1: &Self::Process,
1029 p1_port: &<Self::Process as Node>::Port,
1030 p2: &Self::Process,
1031 p2_port: &<Self::Process as Node>::Port,
1032 ) -> Box<dyn FnOnce()> {
1033 let serialized = format!("o2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1034
1035 Box::new(move || {
1036 trace!(name: "o2o_connect thunk", %serialized);
1037 })
1038 }
1039
1040 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1041 fn o2m_sink_source(
1042 _env: &mut Self::InstantiateEnv,
1043 p1: &Self::Process,
1044 p1_port: &<Self::Process as Node>::Port,
1045 c2: &Self::Cluster,
1046 c2_port: &<Self::Cluster as Node>::Port,
1047 name: Option<&str>,
1048 networking_info: &crate::networking::NetworkingInfo,
1049 ) -> (syn::Expr, syn::Expr) {
1050 match networking_info {
1051 crate::networking::NetworkingInfo::Tcp {
1052 fault: crate::networking::TcpFault::FailStop,
1053 } => {}
1054 _ => panic!("Unsupported networking info: {:?}", networking_info),
1055 }
1056
1057 deploy_containerized_o2m(
1058 name.expect("channel name is required for containerized deployment"),
1059 )
1060 }
1061
1062 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, c2 = c2.name, %c2_port))]
1063 fn o2m_connect(
1064 p1: &Self::Process,
1065 p1_port: &<Self::Process as Node>::Port,
1066 c2: &Self::Cluster,
1067 c2_port: &<Self::Cluster as Node>::Port,
1068 ) -> Box<dyn FnOnce()> {
1069 let serialized = format!("o2m_connect {}:{p1_port} -> {}:{c2_port}", p1.name, c2.name);
1070
1071 Box::new(move || {
1072 trace!(name: "o2m_connect thunk", %serialized);
1073 })
1074 }
1075
1076 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1077 fn m2o_sink_source(
1078 _env: &mut Self::InstantiateEnv,
1079 c1: &Self::Cluster,
1080 c1_port: &<Self::Cluster as Node>::Port,
1081 p2: &Self::Process,
1082 p2_port: &<Self::Process as Node>::Port,
1083 name: Option<&str>,
1084 networking_info: &crate::networking::NetworkingInfo,
1085 ) -> (syn::Expr, syn::Expr) {
1086 match networking_info {
1087 crate::networking::NetworkingInfo::Tcp {
1088 fault: crate::networking::TcpFault::FailStop,
1089 } => {}
1090 _ => panic!("Unsupported networking info: {:?}", networking_info),
1091 }
1092
1093 deploy_containerized_m2o(
1094 &p2.name,
1095 name.expect("channel name is required for containerized deployment"),
1096 )
1097 }
1098
1099 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, p2 = p2.name, %p2_port))]
1100 fn m2o_connect(
1101 c1: &Self::Cluster,
1102 c1_port: &<Self::Cluster as Node>::Port,
1103 p2: &Self::Process,
1104 p2_port: &<Self::Process as Node>::Port,
1105 ) -> Box<dyn FnOnce()> {
1106 let serialized = format!("o2m_connect {}:{c1_port} -> {}:{p2_port}", c1.name, p2.name);
1107
1108 Box::new(move || {
1109 trace!(name: "m2o_connect thunk", %serialized);
1110 })
1111 }
1112
1113 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1114 fn m2m_sink_source(
1115 _env: &mut Self::InstantiateEnv,
1116 c1: &Self::Cluster,
1117 c1_port: &<Self::Cluster as Node>::Port,
1118 c2: &Self::Cluster,
1119 c2_port: &<Self::Cluster as Node>::Port,
1120 name: Option<&str>,
1121 networking_info: &crate::networking::NetworkingInfo,
1122 ) -> (syn::Expr, syn::Expr) {
1123 match networking_info {
1124 crate::networking::NetworkingInfo::Tcp {
1125 fault: crate::networking::TcpFault::FailStop,
1126 } => {}
1127 _ => panic!("Unsupported networking info: {:?}", networking_info),
1128 }
1129
1130 deploy_containerized_m2m(
1131 name.expect("channel name is required for containerized deployment"),
1132 )
1133 }
1134
1135 #[instrument(level = "trace", skip_all, fields(c1 = c1.name, %c1_port, c2 = c2.name, %c2_port))]
1136 fn m2m_connect(
1137 c1: &Self::Cluster,
1138 c1_port: &<Self::Cluster as Node>::Port,
1139 c2: &Self::Cluster,
1140 c2_port: &<Self::Cluster as Node>::Port,
1141 ) -> Box<dyn FnOnce()> {
1142 let serialized = format!("m2m_connect {}:{c1_port} -> {}:{c2_port}", c1.name, c2.name);
1143
1144 Box::new(move || {
1145 trace!(name: "m2m_connect thunk", %serialized);
1146 })
1147 }
1148
1149 #[instrument(level = "trace", skip_all, fields(p2 = p2.name, %p2_port, %shared_handle, extra_stmts = extra_stmts.len()))]
1150 fn e2o_many_source(
1151 extra_stmts: &mut Vec<syn::Stmt>,
1152 p2: &Self::Process,
1153 p2_port: &<Self::Process as Node>::Port,
1154 codec_type: &syn::Type,
1155 shared_handle: String,
1156 ) -> syn::Expr {
1157 p2.exposed_ports.borrow_mut().push(*p2_port);
1158
1159 let socket_ident = syn::Ident::new(
1160 &format!("__hydro_deploy_many_{}_socket", &shared_handle),
1161 Span::call_site(),
1162 );
1163
1164 let source_ident = syn::Ident::new(
1165 &format!("__hydro_deploy_many_{}_source", &shared_handle),
1166 Span::call_site(),
1167 );
1168
1169 let sink_ident = syn::Ident::new(
1170 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1171 Span::call_site(),
1172 );
1173
1174 let membership_ident = syn::Ident::new(
1175 &format!("__hydro_deploy_many_{}_membership", &shared_handle),
1176 Span::call_site(),
1177 );
1178
1179 let bind_addr = format!("0.0.0.0:{}", p2_port);
1180
1181 extra_stmts.push(syn::parse_quote! {
1182 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1183 });
1184
1185 let root = crate::staging_util::get_this_crate();
1186
1187 extra_stmts.push(syn::parse_quote! {
1188 let (#source_ident, #sink_ident, #membership_ident) = #root::runtime_support::hydro_deploy_integration::multi_connection::tcp_multi_connection::<_, #codec_type>(#socket_ident);
1189 });
1190
1191 parse_quote!(#source_ident)
1192 }
1193
1194 #[instrument(level = "trace", skip_all, fields(%shared_handle))]
1195 fn e2o_many_sink(shared_handle: String) -> syn::Expr {
1196 let sink_ident = syn::Ident::new(
1197 &format!("__hydro_deploy_many_{}_sink", &shared_handle),
1198 Span::call_site(),
1199 );
1200 parse_quote!(#sink_ident)
1201 }
1202
1203 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1204 fn e2o_source(
1205 extra_stmts: &mut Vec<syn::Stmt>,
1206 p1: &Self::External,
1207 p1_port: &<Self::External as Node>::Port,
1208 p2: &Self::Process,
1209 p2_port: &<Self::Process as Node>::Port,
1210 _codec_type: &syn::Type,
1211 shared_handle: String,
1212 ) -> syn::Expr {
1213 p1.connection_info.borrow_mut().insert(
1214 *p1_port,
1215 (
1216 p2.docker_container_name.clone(),
1217 *p2_port,
1218 p2.network.clone(),
1219 ),
1220 );
1221
1222 p2.exposed_ports.borrow_mut().push(*p2_port);
1223
1224 let socket_ident = syn::Ident::new(
1225 &format!("__hydro_deploy_{}_socket", &shared_handle),
1226 Span::call_site(),
1227 );
1228
1229 let source_ident = syn::Ident::new(
1230 &format!("__hydro_deploy_{}_source", &shared_handle),
1231 Span::call_site(),
1232 );
1233
1234 let sink_ident = syn::Ident::new(
1235 &format!("__hydro_deploy_{}_sink", &shared_handle),
1236 Span::call_site(),
1237 );
1238
1239 let bind_addr = format!("0.0.0.0:{}", p2_port);
1240
1241 extra_stmts.push(syn::parse_quote! {
1242 let #socket_ident = tokio::net::TcpListener::bind(#bind_addr).await.unwrap();
1243 });
1244
1245 let create_expr = deploy_containerized_external_sink_source_ident(socket_ident);
1246
1247 extra_stmts.push(syn::parse_quote! {
1248 let (#sink_ident, #source_ident) = (#create_expr).split();
1249 });
1250
1251 parse_quote!(#source_ident)
1252 }
1253
1254 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, ?many, ?server_hint))]
1255 fn e2o_connect(
1256 p1: &Self::External,
1257 p1_port: &<Self::External as Node>::Port,
1258 p2: &Self::Process,
1259 p2_port: &<Self::Process as Node>::Port,
1260 many: bool,
1261 server_hint: NetworkHint,
1262 ) -> Box<dyn FnOnce()> {
1263 if server_hint != NetworkHint::Auto {
1264 panic!(
1265 "Docker deployment only supports NetworkHint::Auto, got {:?}",
1266 server_hint
1267 );
1268 }
1269
1270 if many {
1272 p1.connection_info.borrow_mut().insert(
1273 *p1_port,
1274 (
1275 p2.docker_container_name.clone(),
1276 *p2_port,
1277 p2.network.clone(),
1278 ),
1279 );
1280 }
1281
1282 let serialized = format!("e2o_connect {}:{p1_port} -> {}:{p2_port}", p1.name, p2.name);
1283
1284 Box::new(move || {
1285 trace!(name: "e2o_connect thunk", %serialized);
1286 })
1287 }
1288
1289 #[instrument(level = "trace", skip_all, fields(p1 = p1.name, %p1_port, p2 = p2.name, %p2_port, %shared_handle))]
1290 fn o2e_sink(
1291 p1: &Self::Process,
1292 p1_port: &<Self::Process as Node>::Port,
1293 p2: &Self::External,
1294 p2_port: &<Self::External as Node>::Port,
1295 shared_handle: String,
1296 ) -> syn::Expr {
1297 let sink_ident = syn::Ident::new(
1298 &format!("__hydro_deploy_{}_sink", &shared_handle),
1299 Span::call_site(),
1300 );
1301 parse_quote!(#sink_ident)
1302 }
1303
1304 #[instrument(level = "trace", skip_all, fields(%of_cluster))]
1305 fn cluster_ids(
1306 of_cluster: LocationKey,
1307 ) -> impl QuotedWithContext<'a, &'a [TaglessMemberId], ()> + Clone + 'a {
1308 cluster_ids()
1309 }
1310
1311 #[instrument(level = "trace", skip_all)]
1312 fn cluster_self_id() -> impl QuotedWithContext<'a, TaglessMemberId, ()> + Clone + 'a {
1313 cluster_self_id()
1314 }
1315
1316 #[instrument(level = "trace", skip_all, fields(?location_id))]
1317 fn cluster_membership_stream(
1318 _env: &mut Self::InstantiateEnv,
1319 _at_location: &LocationId,
1320 location_id: &LocationId,
1321 ) -> impl QuotedWithContext<'a, Box<dyn Stream<Item = (TaglessMemberId, MembershipEvent)> + Unpin>, ()>
1322 {
1323 cluster_membership_stream(location_id)
1324 }
1325}
1326
1327const CONTAINER_ALPHABET: [char; 36] = [
1328 '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',
1329 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
1330];
1331
1332fn is_valid_docker_image_name(name: &str) -> bool {
1333 regex::Regex::new(r"^[a-z0-9]+([._-][a-z0-9]+)*$")
1334 .unwrap()
1335 .is_match(name)
1336}
1337
1338#[instrument(level = "trace", skip_all, ret, fields(%name_hint, %location_key, %deployment_instance))]
1339fn get_docker_image_name(
1340 name_hint: &str,
1341 location_key: LocationKey,
1342 deployment_instance: &str,
1343) -> String {
1344 let name_hint: String = name_hint
1345 .split("::")
1346 .last()
1347 .unwrap()
1348 .to_ascii_lowercase()
1349 .split(['.', '_', '-'])
1350 .filter(|s| !s.is_empty())
1351 .collect::<Vec<_>>()
1352 .join("-");
1353
1354 let image_name = format!("hy-{name_hint}-{deployment_instance}-{location_key}");
1355
1356 if !is_valid_docker_image_name(&image_name) {
1357 panic!(
1358 "Generated Docker image name '{image_name}' is not a valid Docker image name. \
1359 Docker image names may only contain lowercase alphanumeric characters \
1360 separated by single '.', '_', or '-' characters, and must start and end \
1361 with an alphanumeric character. The most likely cause is your location \
1362 struct name '{name_hint}'"
1363 );
1364 }
1365
1366 image_name
1367}
1368
1369#[instrument(level = "trace", skip_all, ret, fields(%image_name, ?instance))]
1370fn get_docker_container_name(image_name: &str, instance: Option<usize>) -> String {
1371 if let Some(instance) = instance {
1372 format!("{image_name}-{instance}")
1373 } else {
1374 image_name.to_owned()
1375 }
1376}
1377#[derive(Clone)]
1379pub struct DockerDeployProcessSpec {
1380 compilation_options: Option<String>,
1381 config: Vec<String>,
1382 network: DockerNetwork,
1383 deployment_instance: String,
1384 base_image: Option<String>,
1385 linux_compile_type: LinuxCompileType,
1386 features: Vec<String>,
1387}
1388
1389impl<'a> ProcessSpec<'a, DockerDeploy> for DockerDeployProcessSpec {
1390 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1391 fn build(self, key: LocationKey, name_hint: &'_ str) -> <DockerDeploy as Deploy<'a>>::Process {
1392 DockerDeployProcess {
1393 key,
1394 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1395
1396 next_port: Rc::new(RefCell::new(1000)),
1397 rust_crate: Rc::new(RefCell::new(None)),
1398
1399 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1400
1401 docker_container_name: Rc::new(RefCell::new(None)),
1402
1403 compilation_options: self.compilation_options,
1404 config: self.config,
1405
1406 network: self.network.clone(),
1407
1408 base_image: self.base_image,
1409 linux_compile_type: self.linux_compile_type,
1410 features: self.features,
1411 }
1412 }
1413}
1414
1415#[derive(Clone)]
1417pub struct DockerDeployClusterSpec {
1418 compilation_options: Option<String>,
1419 config: Vec<String>,
1420 count: usize,
1421 deployment_instance: String,
1422 base_image: Option<String>,
1423 linux_compile_type: LinuxCompileType,
1424 features: Vec<String>,
1425}
1426
1427impl<'a> ClusterSpec<'a, DockerDeploy> for DockerDeployClusterSpec {
1428 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1429 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::Cluster {
1430 DockerDeployCluster {
1431 key,
1432 name: get_docker_image_name(name_hint, key, &self.deployment_instance),
1433
1434 next_port: Rc::new(RefCell::new(1000)),
1435 rust_crate: Rc::new(RefCell::new(None)),
1436
1437 exposed_ports: Rc::new(RefCell::new(Vec::new())),
1438
1439 docker_container_name: Rc::new(RefCell::new(Vec::new())),
1440
1441 compilation_options: self.compilation_options,
1442 config: self.config,
1443
1444 count: self.count,
1445
1446 base_image: self.base_image,
1447 linux_compile_type: self.linux_compile_type,
1448 features: self.features,
1449 }
1450 }
1451}
1452
1453impl DockerDeployProcessSpec {
1454 pub fn base_image(mut self, image: impl Into<String>) -> Self {
1457 self.base_image = Some(image.into());
1458 self
1459 }
1460
1461 pub fn linux_compile_type(mut self, compile_type: LinuxCompileType) -> Self {
1464 self.linux_compile_type = compile_type;
1465 self
1466 }
1467
1468 pub fn features(mut self, features: impl IntoIterator<Item = impl Into<String>>) -> Self {
1470 self.features.extend(features.into_iter().map(Into::into));
1471 self
1472 }
1473
1474 pub fn feature(mut self, feature: impl Into<String>) -> Self {
1476 self.features.push(feature.into());
1477 self
1478 }
1479}
1480
1481impl DockerDeployClusterSpec {
1482 pub fn base_image(mut self, image: impl Into<String>) -> Self {
1485 self.base_image = Some(image.into());
1486 self
1487 }
1488
1489 pub fn linux_compile_type(mut self, compile_type: LinuxCompileType) -> Self {
1492 self.linux_compile_type = compile_type;
1493 self
1494 }
1495
1496 pub fn features(mut self, features: impl IntoIterator<Item = impl Into<String>>) -> Self {
1498 self.features.extend(features.into_iter().map(Into::into));
1499 self
1500 }
1501
1502 pub fn feature(mut self, feature: impl Into<String>) -> Self {
1504 self.features.push(feature.into());
1505 self
1506 }
1507}
1508
1509pub struct DockerDeployExternalSpec {
1511 name: String,
1512}
1513
1514impl<'a> ExternalSpec<'a, DockerDeploy> for DockerDeployExternalSpec {
1515 #[instrument(level = "trace", skip_all, fields(%key, %name_hint))]
1516 fn build(self, key: LocationKey, name_hint: &str) -> <DockerDeploy as Deploy<'a>>::External {
1517 DockerDeployExternal {
1518 key,
1519 name: self.name,
1520 next_port: Rc::new(RefCell::new(10000)),
1521 next_external_port_id: Rc::new(RefCell::new(crate::Counter::default())),
1522 ports: Rc::new(RefCell::new(HashMap::new())),
1523 connection_info: Rc::new(RefCell::new(HashMap::new())),
1524 }
1525 }
1526}